cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to understand what dropDuplicates is doing?

cchalc
New Contributor III

Smashed our heads against this one for a while and though I think it’s more of a spark question than a Databricks one, wanting to get your thoughts on it. Essentially the gist is this:

  • We select into a DF from a delta table
  • We display the DF and see 2 records:
    • one with ‘someColumn’ = 1 and ‘otherColumn’ = ‘A’
    • one with ‘someColumn’ = 1 and ‘otherColumn’ = ‘B’
  • We run a dropDuplicates([‘someColumn’]) on the DF and store it in DF2
  • We run display(DF2) and see the record with ‘otherColumn’ = ‘A’
  • We run display(DF2.select(col(‘otherColumn’))) and see the record with ‘otherColumn’ = ‘B’

Somehow, even though we’re just selecting a column from DF2 in the second case, it causes what appears to be a re-evaluation of the dropDuplicates function and that re-evaluation comes in a different order, as after the select we’re seeing the other row than we see before the select. Reproducible results are important to us in this case, as the value of otherColumn is used as an identifier to drop the duplicate dependents from another table based on the duplicate parents that were dropped here.

Some other things that seem to affect the results are

  • Inclusion or removal of a group by on the original query
  • Inclusion or removal of some unused columns in the original query
  • Selecting another column along with the ‘otherColumn’
  • Persisting DF2 before selecting on it

Altering those conditions seems to affect what record we see after selecting but haven’t seen anything consistent enough to provide an explanation.

As far as I can hypothesize, dropDuplicates is a lazy function that may end up running multiple times when the DF is used throughout the pipeline, and the behavior of dropDuplicates is largely undefined, as far as what it keeps. Running a window function to assign a row number in the original query and filtering on that instead of using dropDuplicates seems to provide consistent behavior but finding it hard to accept that dropDuplicates is largely unusable in this case without some sort of confirmation on when and why.

The questions I have for you are:

  • Have you seen anything like this before and if so, can you provide any insight on it?
  • Is this purely a behavior of spark or the underlying frameworks, or could this be related to Databricks runtime or something we’re doing?

2 REPLIES 2

AmanSehgal
Honored Contributor III

This happens because of lazy evaluation.

On assigning DF2 as DF.dropDuplicates([‘someColumn’]), no action is performed in the background. Infact, no data is loaded in to DF2 unless an action is performed on it.

Next, on executing action display() or show(), the dropDuplicate transformation is performed on DF and then stored in to DF2. That's how lazy evaluation works.

The number of times you invoke action, in the background fresh transformation will be performed. And because there are two different values for someColumn value '1', it'll pick otherColumn value randomly.

Lazy evalutaion approach helps Spark in filtering or skipping unnecessary transformations when an action is performed.

display(DF2.select(col('otherColumn'))) 
display(DF2)

How to avoid it?

Well, if you want to always have a unique value for otherColumn, then you should include another column to filter the data accordingly. A very common solution is to use timestamps - like insert or update timestamp associated with the record.

However, if you don't have any such column and you want to make sure that every time same value is used throughout the code from DF2, then use persist() operation. It will persist the contents of dataframe across operations after the first time it is computed.

from pyspark.storagelevel import StorageLevel
 
DF2 = DF1.drop_duplicates(['someCol']).persist(StorageLevel.MEMORY_AND_DISK)

cchalc
New Contributor III

Great answer @Aman Sehgal​. I also received another answer from @Ryan Chynoweth​ I will paste here:

1) Have you seen anything like this before and if so, can you provide any insight on it?

Yes this does happen due to the lazy execution of spark and due to the dataset being distributed. Specifically with dropDuplicates it essentially keeps which ever row is returned first and that can change if the rows are on different nodes and/or more than 1 partition. There is a sort operation that happens so replication is not guaranteed with this function.

2) Is this purely a behavior of spark or the underlying frameworks, or could this be related to Databricks runtime or something we’re doing?

This behavior is not specific to Spark (or MPPs in general) but more related to the way dropDuplicates was created. Similarly, the df.first() function simply takes which ever row is returned first and in a distributed data.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group