cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Pyspark Dataframes orderby only orders within partition when having multiple worker

dbx-user7354
New Contributor III

I came across a pyspark issue when sorting the dataframe by a column. It seems like pyspark only orders the data within partitions when having multiple worker, even though it shouldn't. 

 

from pyspark.sql import functions as F
import matplotlib.pyplot as plt
import numpy as np

num_rows = 1000000
num_cols = 300

# Create DataFrame mit 1 Million Rows and 300 columns with random data
columns = ["col_" + str(i) for i in range(num_cols)]
data = spark.range(0, num_rows)

# create id column first
data = data.repartition(1) # we need this here so monotonically_increasing_id gives all numbers sequentially
data = data.withColumn("test1", F.monotonically_increasing_id())
data = data.orderBy(F.rand())
data = data.repartition(10)

for col_name in columns:
    data = data.withColumn(col_name, F.rand())

# default sorting which leads to wrong sorting
data = data.orderBy("test1", ascending=False)
test2 = data.select(F.collect_list("test1")).first()[0]
plt.plot(test2, color="red")

# test sorting after repartioning to 1 partition
data2 = data.repartition(1)
data2 = data2.orderBy("test1", ascending=False)
test3 = data2.select(F.collect_list("test1")).first()[0]
plt.plot(test3, color="red")

 

The first plot looks like this:

dbxuser7354_0-1711014288660.png

The second plot looks like this:

dbxuser7354_1-1711014300462.png

 

The second plot (after repartitioning to 1 partition) shows the correct sorting.

 

Is this a known issue? If so it is a pyspark issue or a databricks issue? When having only one worker both plots are correct.

3 REPLIES 3

Kaniz
Community Manager
Community Manager

Hi @dbx-user7354

  • When you use orderBy() or sort(), Spark performs sorting within each partition independently. Consequently, if the data is not evenly distributed across partitions, the overall sorting order may not be as expected.
  • Consider choosing an optimal number of partitions based on your cluster configuration and data size. Ideally, the number of partitions should align with the number of available cores or workers. You can use repartition() with a higher number of partitions (e.g., data.repartition(100)) to achieve better distribution.

dbx-user7354
New Contributor III

Thanks for your quick answer. Where can I find the information that orderby() or sort() is only sorting within the partition? The official doc does not mention this.

MarkusFra
New Contributor II

@Kaniz 

Sorry if I have to ask again, but I am a bit confused by this.

I thought, that pysparks `orderBy()` and `sort()` do a shuffle operation before the sorting for exact this reason. There is another command `sortWithinPartitions()` that does not do that and does a partition wise sorting. I am acutally suprised that `sort()` also only works partition wise. But then: Why does it work on a singleNode Cluster on a partitioned DataFrame?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.