cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to get all occurrences of duplicate records in a PySpark DataFrame based on specific columns?

Mado
Valued Contributor II

Hi,

I need to find all occurrences of duplicate records in a PySpark DataFrame. Following is the sample dataset:

# Prepare Data
data = [("A", "A", 1), \
    ("A", "A", 2), \
    ("A", "A", 3), \
    ("A", "B", 4), \
    ("A", "B", 5), \
    ("A", "C", 6), \
    ("A", "D", 7), \
    ("A", "E", 8), \
  ]
 
# Create DataFrame
columns= ["col_1", "col_2", "col_3"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

image 

When I try the following code:

primary_key = ['col_1', 'col_2']
 
duplicate_records = df.exceptAll(df.dropDuplicates(primary_key))

The output will be:

image 

As you can see, I don't get all occurrences of duplicate records based on the Primary Key, since one instance of duplicate records is present in "df.dropDuplicates(primary_key)". The 1st and the 4th records of the dataset must be in the output. 

Any idea to solve this issue?

1 ACCEPTED SOLUTION

Accepted Solutions

daniel_sahal
Esteemed Contributor

Hi,

Getting the not duplicated records and doing 'left_anti' join should do the trick.

not_duplicate_records = df.groupBy(primary_key).count().where('count = 1').drop('count')
 
duplicate_records = df.join(not_duplicate_records, on=primary_key, how='left_anti').show()

image

View solution in original post

3 REPLIES 3

daniel_sahal
Esteemed Contributor

Hi,

Getting the not duplicated records and doing 'left_anti' join should do the trick.

not_duplicate_records = df.groupBy(primary_key).count().where('count = 1').drop('count')
 
duplicate_records = df.join(not_duplicate_records, on=primary_key, how='left_anti').show()

image

Shalabh007
Honored Contributor

@Mohammad Saber​ how about using window function like below

windowSpec  = Window.partitionBy(*primary_key)
 
df.withColumn("primary_key_count",F.count("*").over(windowSpec)).filter(F.col("primary_key_count") > 1).drop("primary_key_count").show(truncate=False)

NhatHoang
Valued Contributor II

Hi,

In my experience, if you use dropDuplicates(), Spark will keep a random row.

Therefore, you should define a logic to remove duplicated rows.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group