cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Slow PySpark operations after long DAG that contains many joins and transformations

Jonathan_
New Contributor III

We are using PySpark and notice that when we are doing many transformations/aggregations/joins of the data then at some point the execution time of simple task (count, display, union of 2 tables, ...) become very slow even if we have a small data (ex : 4000 rows). It seems or might be related to the DAG.

At first we have tried persist() of intermediate dataframes, but it doesn't do a big difference. Then we tried to write to a catalog intermediate dataframes and doing so it does a big difference, however it doesn't seems logic to be forced to create permanent files for these. We also tried checkpoint() by setting the checpoint directory to ephemeral storage of the running cluster (spark.sparkContext.setCheckpointDir("file:/tmp/checkpoints")), it seems to do the trick however we don't know pros and cons of it and if there is better way to manage all that. 

It's kind of weird that passing the data into memory using pandas gives better performance then using Spark. We would like to have input of the community on what is our options given what we have tried so far. 

7 REPLIES 7

Khaja_Zaffer
Contributor III

Hello @Jonathan_ 

Good day!

To answer first, I think as you mentioned pandas are faster than spark because, spark works better with big data. 

Pandas runs in-memory on a single machine without this distributed overhead, making it faster for small data.
 
As you mentioned, you have joins, aggregations - 
Make sure you have AQE turned on, use broadcast join, or check if you can convern the string like date or any string values into int so that joins like sort aggregations are avoided and hash aggregations are used. 
see if you can use which join is better out of shuffle or sort merge join. Here is a article for you:
 
the default block size is usually 128 MB, see if you use repartition or coalesce to decrease the number of partition so that many small blocks can be joined so that better optimizations can be observed. 
 
Try to minimize shuffling operations and use filter in early stages of the data. 
 
Turn on  spark.eventLog.enabled=true, collect the logs and raise a ticket with azure databricks as these should be checked internally what is the issue, what actually went wrong and nature of the issue. 
 
if you working with delta files, make be optimize can bring all the small files into a big one. 
 
 
I am also open to other solutions from contributors. 

BS_THE_ANALYST
Esteemed Contributor II

@Jonathan_ I think @Khaja_Zaffer has raised some great points. 

With Spark, broadcast joins will ensure the smaller table is in memory across all of the worker nodes. This should certainly help with speed. Shuffling is certainly always going to take some time and things like Joins & Aggregations will prompt a shuffle.

@Khaja_Zaffer whilst Pandas uses a single node (great point about it not having a distributed overhead), Pandas API on Spark can be configured to scale: https://spark.apache.org/pandas-on-spark/ which is really cool. Perhaps there's some benchmarks available for Spark vs Pandas vs Pandas API on Spark 😏? Would be interesting to see 🙂

@Jonathan_ when you say "become very slow", how slow is slow? Is the development process slow? The job execution? The compute startup? Which compute are you using? How many people are accessing the compute when you're experiencing the "slow"? Is "slow" in reference to the speed of execution when compared to pandas?

All the best,
BS

Prajapathy_NKR
New Contributor

@Jonathan_  

1. I noticed you had tried to persist your result, just a remainder that dataframe is stored only if an action is performed. So if you would like to store the result in memory, try to add a action like count immediately after using persist. Also, for faster performance try to store the dataframe in deserialized format in memory, as it is going to be faster than serialized format in memory / disk. 

2. Take a look at number of partitions being created, it can be less or more. try to create 2-3 times the number of total cores present across all executors. 

3. As @Khaja_Zaffer mentioned, try to enable AQE, it helps in auto optimization of partitions, shuffle partitions and join strategy. 

4. If you are using UDF or have a piece of query which is running outside of spark, it is going to be slow.

To answer your question for checkpoint, it writes to your location that you have mentioned, it is not in the disk/memory of executor. Considering about data locality, it is far away from executor, hence it will be slower. Pros is if executors fails, it is faster to recover because the information isn't stored in memory or disk of the executor. This is right if the deploy mode is client or cluster. But if it is local then executor disk and your system disk are the same so performance will be same.

Hope it helps. 

tarunnagar
New Contributor II
This is a pretty common issue with PySpark when working on large DAGs with lots of joins and transformations. As the DAG grows, Spark has to maintain a huge execution plan, and performance can drop due to shuffling, serialization, and memory overhead.

A few things you can try to speed it up:
  1. Cache/Checkpoint Strategically: If you have intermediate results that get reused, use cache() or checkpoint() to break the DAG and avoid recomputation.
  2. Optimize Joins: Make sure large tables are partitioned properly and use broadcast() for smaller DataFrames to reduce shuffle.
  3. Reduce Shuffles: Try using coalesce() instead of repartition() when possible, and avoid unnecessary wide transformations.
  4. Persist Early Results: Write intermediate outputs to disk or a temp table to simplify the DAG before continuing.
  5. Enable Adaptive Query Execution (AQE): If you’re on Spark 3.0+, set spark.sql.adaptive.enabled=true — it helps optimize joins and reduce shuffle size automatically.
  6. Monitor with Spark UI: Check the stages that take the longest and look for skewed data or uneven partitioning.
In short, the key is to simplify your DAG and break it into smaller, optimized stages. Spark performs best when it doesn’t have to keep track of too many transformations at once.

 

Jonathan_
New Contributor III

Hi,

We forgot to say that we were using a single node cluster (E class with 16 cores). Often in our projects we need to used library that works mainly with data in memory. We also need to remember that here we are not referring to a large data.

When we were referring to being slow, it was not the compute startup. At some point, after many data manipulation, spark was taking around 5 min just to do a simple count/display. One time the job was just throwing errors like org.apache.spark.SparkException: Multiple failures in stage materialization. We have try persist and cache, but the only way we found to resolve this was to do a checkpoint() to cut the DAG lineage or write table to unity catalog. However, it's not clear for us why we have to do this and if there is better way to manage all that.

We have not tried modifying the default number of partitions being create in Databricks, we know it's possible but we were thinking that with AQE turn on (in recent runtime release) that we don't have to manage this manually. Do I understand that we have to manage this manually each time ?

We have not create UDF, however we have group together some data manipulations into a function and assign the results to a Spark dataframe. Are there any elements to take into consideration about this ? Doing a count() or display() of the resulting table was particularly slow (takes more than 3 min).

Regards,

Jonathan

@Jonathan_ Good that you have given some extra information. Based on that I think that there might be memory issue, since it is a single node cluster, both driver and executor resides in the same machine. 

It would be better if you could tell me the RAM that is allocated.  

If your RAM is around 16gb then you can take a look at PART A and PART B.

You have 16 cores, out of which 15 will be utilized as executors and 1 for driver.

PART A of memory issue:

Check in spark ui, what is the storage memory which is allocated under executor tab, and by default spark memory storageFraction is 0.5. Which means 50% is utilized for storing (cache/persist ) and 50% for execution ( join, groupby ).

For example : if the storage memory is 1 gb, then the execution memory is 1gb. So, if you do persist/cache of df you can see the storage memory getting utilized. 

Based on that if you feel that your storage isnt getting utilized much try to adjust the storageFraction and give more memory for execution.

PART B of memory issue:

Now even the driver needs memory for scheduling tasks, to store the results from executors, monitoring.

You could have had OOM error if you tried to return more data than spark.driver.maxResultSize by default it is 1gb.

These are something to keep in mind.

REG AQE, if you are using spark 3.0 it is available by default.

"a simple count/display", if you are new to spark, count and display are action statements. When spark encounters such action statements, it reads the data and runs all your transformations. 

For example: i have 2 action statement, one count and one action, what happens internally is, for count the data is read then processed and then your result is sent to driver, and it is visible to us. And when display runs, it again reads the data and does the processing and then it is sent to driver and shows your result. So there will be 2 jobs. 

One other issue can be small files issue during read, if your input data has lots of files or small files, there will be more time spent in IO, trying to read each file. If that is the case you can make these into a bigger chunk of file.

"Often in our projects we need to used library that works mainly with data in memory", also a remainder, if any process is running out side spark framework, it is running on driver only. And in local mode driver is 1 core and all the process is running on that one core. 

Hope it helps. 🙂 

Jonathan_
New Contributor III

It's a cluster with 128 GO of memory, when looking in Spark UI there is 54 GO for storage memory. Honestly I don't think it's memory issue like I said it's a small data and if we do checkpoint at same point then continu we don't have the problem after. However we have not try to manually change the partitions as point by @Khaja_Zaffer or redo the partition after many transformations maybe it can help ? At first, the problem we encounter seems due to the the DAG that try to track too many transformations at once like @tarunnagar was pointing out or maybe a small files problem when spark try to optimize the repartition.