cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Some very simple functions in Pandas on Spark are very slow

Vik1
New Contributor II

I have a pandas on spark dataframe with 8 million rows and 20 columns. It took 3.48 minutes to run df.shape and it takes. It also takes a long time to run df.head took 4.55 minutes . By contrast df.var1.value_counts().reset_index() took only 0.18 seconds.

I am a bit surprised that shape and head - simplest of the dataframe functions - take this long. I would assume that value_counts should take longer because if var1 values are split over different nodes then data shuffle is needed. shape is a simple count whereas head is a simple fetch of 5 rows from any node.

Am I doing something wrong? Is there a documentation on best practices and guidance on how to use Spark Pandas API

4 REPLIES 4

Hubert-Dudek
Esteemed Contributor III

Be sure to import as "import pyspark.pandas as ps"

Please compare times with similar operations on the usual spark first. There can be multiple problems related to the dataset.

  • Look for data skews; some partitions can be huge, some small because of incorrect partitioning. You can use Spark UI to do that but also debug your code a bit (get getNumPartitions()) especially, SQL can divide it unequally to partitions (there are settings in connector lowerBound, etc.). You could try to have a number of partitions as workers cores multiplied by X (so they will be processed step by step in the queue). When data is processed, your partitions should be ft in RAM.
  • Increase shuffle size spark.sql.shuffle.partitions default is 200 try bigger. It would be best if you calculated it as data size divided by the size of the partition,
  • increase the size of the driver to be two times bigger than the executor (but to get the optimal size, please analyze load - in databricks on cluster tab look to Metrics there is Ganglia or even better integrate datadog with cluster),
  • Check wide transformations. Such transformations must shuffle data between partitions and group them to do only one shuffle. Suppose a shuffle happens (for example, getting the field's average value from the whole dataset will require data from all partitions). Remember that partitions are moving between workers using the network. There is already a partition in memory on workers, and there should be enough operation memory for incoming data. Writing to disk if there is no memory can be the bottleneck (check disk spills in Spark UI)
  • If you need to filter data, if possible, do it after reading from SQL so it will be predicative push and add where in SQL query.
  • Make sure that everything runs in a distributed way, specially UDF. It would help if you used vectorized pandas udfs so that they will run on executors. Don't use collect etc.
  • Sometimes (but I bet it is not that case), I process big data as a stream as it is easier with big data sets. In that scenario, you would need Kafka (which can be a confluent cloud) between SQL and Databricks.
  • Regarding infrastructure, use more workers and check that your ADLS is connecting through a private link. Monitor save progress in a folder. You can also use premium ADLS which is faster.

Kaniz
Community Manager
Community Manager

Hi @Wiki​ , We haven’t heard from you on the last response from @Hubert Dudek​  , and I was checking back to see if his suggestions helped you. Or else, If you have any solution, please do share that with the community as it can be helpful to others.

colinsorensen
New Contributor III

Corroborating Vik's experience. Head() and shape() are extremely slow, along with info().

Hubert provides some suggestions but I don't think any explain why such basic functions aren't performing when other functions can run in a half a second.

PeterDowdy
New Contributor II

The reason why this is slow is because pandas needs an index column to perform `shape` or `head`. If you don't provide one, pyspark pandas enumerates the entire dataframe to create a default one. For example, given columns A, B, and C in dataframe `df` with a million rows, then `df.pandas_api().head()` will take a long time, but `df.pandas_api(index_col='A').head()` will complete quickly.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.