Tuesday
We are using PySpark and notice that when we are doing many transformations/aggregations/joins of the data then at some point the execution time of simple task (count, display, union of 2 tables, ...) become very slow even if we have a small data (ex : 4000 rows). It seems or might be related to the DAG.
At first we have tried persist() of intermediate dataframes, but it doesn't do a big difference. Then we tried to write to a catalog intermediate dataframes and doing so it does a big difference, however it doesn't seems logic to be forced to create permanent files for these. We also tried checkpoint() by setting the checpoint directory to ephemeral storage of the running cluster (spark.sparkContext.setCheckpointDir("file:/tmp/checkpoints")), it seems to do the trick however we don't know pros and cons of it and if there is better way to manage all that.
It's kind of weird that passing the data into memory using pandas gives better performance then using Spark. We would like to have input of the community on what is our options given what we have tried so far.
Wednesday
Hello @Jonathan_
Good day!
To answer first, I think as you mentioned pandas are faster than spark because, spark works better with big data.
Wednesday - last edited Wednesday
@Jonathan_ I think @Khaja_Zaffer has raised some great points.
With Spark, broadcast joins will ensure the smaller table is in memory across all of the worker nodes. This should certainly help with speed. Shuffling is certainly always going to take some time and things like Joins & Aggregations will prompt a shuffle.
@Khaja_Zaffer whilst Pandas uses a single node (great point about it not having a distributed overhead), Pandas API on Spark can be configured to scale: https://spark.apache.org/pandas-on-spark/ which is really cool. Perhaps there's some benchmarks available for Spark vs Pandas vs Pandas API on Spark 😏? Would be interesting to see 🙂
@Jonathan_ when you say "become very slow", how slow is slow? Is the development process slow? The job execution? The compute startup? Which compute are you using? How many people are accessing the compute when you're experiencing the "slow"? Is "slow" in reference to the speed of execution when compared to pandas?
All the best,
BS
Thursday - last edited Thursday
1. I noticed you had tried to persist your result, just a remainder that dataframe is stored only if an action is performed. So if you would like to store the result in memory, try to add a action like count immediately after using persist. Also, for faster performance try to store the dataframe in deserialized format in memory, as it is going to be faster than serialized format in memory / disk.
2. Take a look at number of partitions being created, it can be less or more. try to create 2-3 times the number of total cores present across all executors.
3. As @Khaja_Zaffer mentioned, try to enable AQE, it helps in auto optimization of partitions, shuffle partitions and join strategy.
4. If you are using UDF or have a piece of query which is running outside of spark, it is going to be slow.
To answer your question for checkpoint, it writes to your location that you have mentioned, it is not in the disk/memory of executor. Considering about data locality, it is far away from executor, hence it will be slower. Pros is if executors fails, it is faster to recover because the information isn't stored in memory or disk of the executor. This is right if the deploy mode is client or cluster. But if it is local then executor disk and your system disk are the same so performance will be same.
Hope it helps.
yesterday
This is a pretty common issue with PySpark when working on large DAGs with lots of joins and transformations. As the DAG grows, Spark has to maintain a huge execution plan, and performance can drop due to shuffling, serialization, and memory overhead.
A few things you can try to speed it up:
Cache/Checkpoint Strategically: If you have intermediate results that get reused, use cache() or checkpoint() to break the DAG and avoid recomputation.
Optimize Joins: Make sure large tables are partitioned properly and use broadcast() for smaller DataFrames to reduce shuffle.
Reduce Shuffles: Try using coalesce() instead of repartition() when possible, and avoid unnecessary wide transformations.
Persist Early Results: Write intermediate outputs to disk or a temp table to simplify the DAG before continuing.
Enable Adaptive Query Execution (AQE): If you’re on Spark 3.0+, set spark.sql.adaptive.enabled=true — it helps optimize joins and reduce shuffle size automatically.
Monitor with Spark UI: Check the stages that take the longest and look for skewed data or uneven partitioning.
In short, the key is to simplify your DAG and break it into smaller, optimized stages. Spark performs best when it doesn’t have to keep track of too many transformations at once.
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now