Here are the simple steps to reproduce it. Note that col "foo" and "bar" are just redundant cols to make sure the dataframe doesn't fit into a single partition.
// generate a random df
val rand = new scala.util.Random
val df = (1 to 3000).map(i => (rand.nextInt, "foo" * 50000, "bar" * 50000)).toSeq.toDF("col1", "foo", "bar").orderBy(desc("col1")).cache()
// this is the correct results
df.orderBy(desc("col1")).limit(5).show()
/* outputs of benchmark:
+----------+--------------------+--------------------+
| col1| foo| bar|
+----------+--------------------+--------------------+
|2146781842|foofoofoofoofoofo...|barbarbarbarbarba...|
|2146642633|foofoofoofoofoofo...|barbarbarbarbarba...|
|2145715082|foofoofoofoofoofo...|barbarbarbarbarba...|
|2136356447|foofoofoofoofoofo...|barbarbarbarbarba...|
|2133539394|foofoofoofoofoofo...|barbarbarbarbarba...|
+----------+--------------------+--------------------+
*/
// however it seems not true when I call limit().rdd.collect on the cached dataframe without order by again, show() and take() returns the correct results however rdd.collect doesn't
df.limit(5).select("col1").show()
/* this is correct
+----------+
| col1|
+----------+
|2146781842|
|2146642633|
|2145715082|
|2136356447|
|2133539394|
+----------+
*/
df.select("col1").take(5)
/*this is also correct
Array[org.apache.spark.sql.Row] = Array([2146781842], [2146642633], [2145715082], [2136356447], [2133539394])
*/
df.limit(5).select("col1").rdd.collect
/* this is incorrect
Array[org.apache.spark.sql.Row] = Array([2146781842], [2146642633], [2145715082], [2133000691], [2130499969])
*/
Is it expected that calling cache() will break the ordering of rows? also what is causing the difference between limit(5).rdd.collect vs take(5) and limit(5).show()? according to the spark sql documentation it is supposed to be deterministic. what am I missing here?
" LIMIT
clause is used to constrain the number of rows returned by the SELECT statement. In general, this clause is used in conjunction with ORDER BY to ensure that the results are deterministic. "
// attached are my cluster setup
// Runtime: 11.3 LTS (scala 2.12, spark 3.3.0)
// 2 r5.xlarge + 1 r5.2xlarge
spark.sql.autoBroadcastJoinThreshold -1
spark.driver.extraJavaOptions -Xss16m
spark.dynamicAllocation.enabled true
spark.shuffle.service.enabled true
spark.sql.parquet.fs.optimized.committer.optimization-enabled true
spark.sql.files.ignoreCorruptFiles true
spark.hadoop.fs.s3a.acl.default BucketOwnerFullControl
spark.hadoop.mapreduce.use.parallelmergepaths true
spark.driver.maxResultSize 64g
spark.hadoop.fs.s3a.canned.acl BucketOwnerFullControl
spark.sql.shuffle.partitions 1200
spark.network.timeout 180
spark.sql.broadcastTimeout 30000
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.executor.extraJavaOptions -Xss16m
spark.dynamicAllocation.executorIdleTimeout 1s
spark.default.parallelism 1200
spark.port.maxRetries 70
spark.dynamicAllocation.schedulerBacklogTimeout 1s