mala
New Contributor III

This issue was due to spark parallelization which doesn't guarantee the same data is assigned to each partition.

I was able to resolve this by making sure the same data is assigned to the same partitions :

df.repartition(num_partitions, "ur_col_id")

df.sortWithinPartitions("ur_col_id")

View solution in original post