cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Pyspark Dataframes orderby only orders within partition when having multiple worker

dbx-user7354
New Contributor III

I came across a pyspark issue when sorting the dataframe by a column. It seems like pyspark only orders the data within partitions when having multiple worker, even though it shouldn't. 

 

from pyspark.sql import functions as F
import matplotlib.pyplot as plt
import numpy as np

num_rows = 1000000
num_cols = 300

# Create DataFrame mit 1 Million Rows and 300 columns with random data
columns = ["col_" + str(i) for i in range(num_cols)]
data = spark.range(0, num_rows)

# create id column first
data = data.repartition(1) # we need this here so monotonically_increasing_id gives all numbers sequentially
data = data.withColumn("test1", F.monotonically_increasing_id())
data = data.orderBy(F.rand())
data = data.repartition(10)

for col_name in columns:
    data = data.withColumn(col_name, F.rand())

# default sorting which leads to wrong sorting
data = data.orderBy("test1", ascending=False)
test2 = data.select(F.collect_list("test1")).first()[0]
plt.plot(test2, color="red")

# test sorting after repartioning to 1 partition
data2 = data.repartition(1)
data2 = data2.orderBy("test1", ascending=False)
test3 = data2.select(F.collect_list("test1")).first()[0]
plt.plot(test3, color="red")

 

The first plot looks like this:

dbxuser7354_0-1711014288660.png

The second plot looks like this:

dbxuser7354_1-1711014300462.png

 

The second plot (after repartitioning to 1 partition) shows the correct sorting.

 

Is this a known issue? If so it is a pyspark issue or a databricks issue? When having only one worker both plots are correct.

3 REPLIES 3

Kaniz_Fatma
Community Manager
Community Manager

Hi @dbx-user7354

  • When you use orderBy() or sort(), Spark performs sorting within each partition independently. Consequently, if the data is not evenly distributed across partitions, the overall sorting order may not be as expected.
  • Consider choosing an optimal number of partitions based on your cluster configuration and data size. Ideally, the number of partitions should align with the number of available cores or workers. You can use repartition() with a higher number of partitions (e.g., data.repartition(100)) to achieve better distribution.

Thanks for your quick answer. Where can I find the information that orderby() or sort() is only sorting within the partition? The official doc does not mention this.

MarkusFra
New Contributor III

@Kaniz_Fatma 

Sorry if I have to ask again, but I am a bit confused by this.

I thought, that pysparks `orderBy()` and `sort()` do a shuffle operation before the sorting for exact this reason. There is another command `sortWithinPartitions()` that does not do that and does a partition wise sorting. I am acutally suprised that `sort()` also only works partition wise. But then: Why does it work on a singleNode Cluster on a partitioned DataFrame?

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!