cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Pyspark Dataframes orderby only orders within partition when having multiple worker

dbx-user7354
New Contributor III

I came across a pyspark issue when sorting the dataframe by a column. It seems like pyspark only orders the data within partitions when having multiple worker, even though it shouldn't. 

 

from pyspark.sql import functions as F
import matplotlib.pyplot as plt
import numpy as np

num_rows = 1000000
num_cols = 300

# Create DataFrame mit 1 Million Rows and 300 columns with random data
columns = ["col_" + str(i) for i in range(num_cols)]
data = spark.range(0, num_rows)

# create id column first
data = data.repartition(1) # we need this here so monotonically_increasing_id gives all numbers sequentially
data = data.withColumn("test1", F.monotonically_increasing_id())
data = data.orderBy(F.rand())
data = data.repartition(10)

for col_name in columns:
    data = data.withColumn(col_name, F.rand())

# default sorting which leads to wrong sorting
data = data.orderBy("test1", ascending=False)
test2 = data.select(F.collect_list("test1")).first()[0]
plt.plot(test2, color="red")

# test sorting after repartioning to 1 partition
data2 = data.repartition(1)
data2 = data2.orderBy("test1", ascending=False)
test3 = data2.select(F.collect_list("test1")).first()[0]
plt.plot(test3, color="red")

 

The first plot looks like this:

dbxuser7354_0-1711014288660.png

The second plot looks like this:

dbxuser7354_1-1711014300462.png

 

The second plot (after repartitioning to 1 partition) shows the correct sorting.

 

Is this a known issue? If so it is a pyspark issue or a databricks issue? When having only one worker both plots are correct.

5 REPLIES 5

Thanks for your quick answer. Where can I find the information that orderby() or sort() is only sorting within the partition? The official doc does not mention this.

MarkusFra
New Contributor III

@Retired_mod 

Sorry if I have to ask again, but I am a bit confused by this.

I thought, that pysparks `orderBy()` and `sort()` do a shuffle operation before the sorting for exact this reason. There is another command `sortWithinPartitions()` that does not do that and does a partition wise sorting. I am acutally suprised that `sort()` also only works partition wise. But then: Why does it work on a singleNode Cluster on a partitioned DataFrame?

NandiniN
Databricks Employee
Databricks Employee

The orderBy function in PySpark is expected to perform a global sort, which involves shuffling the data across partitions to ensure that the entire DataFrame is sorted. This is different from sortWithinPartitions, which only sorts data within each partition.

Let me try your program and understand the results further.

NandiniN
Databricks Employee
Databricks Employee

Both before and after repartition I see the same results for orderBy

NemesisMF
New Contributor II

@NandiniN 

Did you try with a multiple worker cluster? Which Runtime with which spark version did you use?

Maybe it would be good to test with Runtime 13.3, then we would know that it was fixed in the meantime.

I found this on StackOverflow. Seems someone had a similar problem: https://stackoverflow.com/questions/55860388/pyspark-dataframe-orderby-partition-level-or-overall

There is also a very old HIVE bug ticket that was never resolved. Not sure, if it could be connected:

https://issues.apache.org/jira/browse/HIVE-10417

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group