cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Order of a dataframe is not perserved after calling cache() and limit()

jerry-xu-sa
New Contributor II

Here are the simple steps to reproduce it. Note that col "foo" and "bar" are just redundant cols to make sure the dataframe doesn't fit into a single partition.

// generate a random df
val rand = new scala.util.Random
val df = (1 to 3000).map(i => (rand.nextInt, "foo" * 50000, "bar" * 50000)).toSeq.toDF("col1", "foo", "bar").orderBy(desc("col1")).cache()
 
// this is the correct results
df.orderBy(desc("col1")).limit(5).show()
/* outputs of benchmark: 
 +----------+--------------------+--------------------+
|      col1|                 foo|                 bar|
+----------+--------------------+--------------------+
|2146781842|foofoofoofoofoofo...|barbarbarbarbarba...|
|2146642633|foofoofoofoofoofo...|barbarbarbarbarba...|
|2145715082|foofoofoofoofoofo...|barbarbarbarbarba...|
|2136356447|foofoofoofoofoofo...|barbarbarbarbarba...|
|2133539394|foofoofoofoofoofo...|barbarbarbarbarba...|
+----------+--------------------+--------------------+
*/
 
// however it seems not true when I call limit().rdd.collect on the cached dataframe without order by again, show() and take() returns the correct results however rdd.collect doesn't
df.limit(5).select("col1").show()
/* this is correct
+----------+
|      col1|
+----------+
|2146781842|
|2146642633|
|2145715082|
|2136356447|
|2133539394|
+----------+
*/
df.select("col1").take(5)
/*this is also correct
Array[org.apache.spark.sql.Row] = Array([2146781842], [2146642633], [2145715082], [2136356447], [2133539394])
*/
 df.limit(5).select("col1").rdd.collect
/* this is incorrect
Array[org.apache.spark.sql.Row] = Array([2146781842], [2146642633], [2145715082], [2133000691], [2130499969])
*/

Is it expected that calling cache() will break the ordering of rows? also what is causing the difference between limit(5).rdd.collect vs take(5) and limit(5).show()? according to the spark sql documentation it is supposed to be deterministic. what am I missing here?

" LIMIT

 clause is used to constrain the number of rows returned by the SELECT statement. In general, this clause is used in conjunction with ORDER BY to ensure that the results are deterministic. "

// attached are my cluster setup
// Runtime: 11.3 LTS (scala 2.12, spark 3.3.0)
// 2 r5.xlarge + 1 r5.2xlarge
spark.sql.autoBroadcastJoinThreshold -1
spark.driver.extraJavaOptions -Xss16m
spark.dynamicAllocation.enabled true
spark.shuffle.service.enabled true
spark.sql.parquet.fs.optimized.committer.optimization-enabled true
spark.sql.files.ignoreCorruptFiles true
spark.hadoop.fs.s3a.acl.default BucketOwnerFullControl
spark.hadoop.mapreduce.use.parallelmergepaths true
spark.driver.maxResultSize 64g
spark.hadoop.fs.s3a.canned.acl BucketOwnerFullControl
spark.sql.shuffle.partitions 1200
spark.network.timeout 180
spark.sql.broadcastTimeout 30000
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.executor.extraJavaOptions -Xss16m
spark.dynamicAllocation.executorIdleTimeout 1s
spark.default.parallelism 1200
spark.port.maxRetries 70
spark.dynamicAllocation.schedulerBacklogTimeout 1s

2 REPLIES 2

Anonymous
Not applicable

@Jerry Xu​ :

The behavior you are seeing is expected, as the cache() operation does not guarantee the order of the rows in the cached DataFrame. When you call limit(5) on a cached DataFrame without an explicit orderBy(), the Spark execution engine will select any 5 rows that are available in the cache, which may not necessarily be the first 5 rows in the original order. When you call show() or take() after orderBy() and limit(), the Spark execution engine will perform a new query and generate a new execution plan that includes the orderBy() clause, which will enforce the correct ordering of the rows. When you call rdd.collect() on a cached DataFrame without an explicit orderBy(), the Spark execution engine will use the cached data directly, which may not be ordered correctly.

val df = (1 to 3000).map(i => (rand.nextInt, "foo" * 50000, "bar" * 50000)).toSeq.toDF("col1", "foo", "bar").orderBy(desc("col1")).cache()

This will ensure that the rows are cached in the correct order. Hope this helps!

Anonymous
Not applicable

Hi @Jerry Xu​ 

Thank you for your question! To assist you better, please take a moment to review the answer and let me know if it best fits your needs.

Please help us select the best solution by clicking on "Select As Best" if it does.

Your feedback will help us ensure that we are providing the best possible service to you. Thank you!

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.