cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to get all occurrences of duplicate records in a PySpark DataFrame based on specific columns?

Mado
Valued Contributor II

Hi,

I need to find all occurrences of duplicate records in a PySpark DataFrame. Following is the sample dataset:

# Prepare Data
data = [("A", "A", 1), \
    ("A", "A", 2), \
    ("A", "A", 3), \
    ("A", "B", 4), \
    ("A", "B", 5), \
    ("A", "C", 6), \
    ("A", "D", 7), \
    ("A", "E", 8), \
  ]
 
# Create DataFrame
columns= ["col_1", "col_2", "col_3"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

image 

When I try the following code:

primary_key = ['col_1', 'col_2']
 
duplicate_records = df.exceptAll(df.dropDuplicates(primary_key))

The output will be:

image 

As you can see, I don't get all occurrences of duplicate records based on the Primary Key, since one instance of duplicate records is present in "df.dropDuplicates(primary_key)". The 1st and the 4th records of the dataset must be in the output. 

Any idea to solve this issue?

1 ACCEPTED SOLUTION

Accepted Solutions

daniel_sahal
Esteemed Contributor

Hi,

Getting the not duplicated records and doing 'left_anti' join should do the trick.

not_duplicate_records = df.groupBy(primary_key).count().where('count = 1').drop('count')
 
duplicate_records = df.join(not_duplicate_records, on=primary_key, how='left_anti').show()

image

View solution in original post

3 REPLIES 3

daniel_sahal
Esteemed Contributor

Hi,

Getting the not duplicated records and doing 'left_anti' join should do the trick.

not_duplicate_records = df.groupBy(primary_key).count().where('count = 1').drop('count')
 
duplicate_records = df.join(not_duplicate_records, on=primary_key, how='left_anti').show()

image

Shalabh007
Honored Contributor

@Mohammad Saberโ€‹ how about using window function like below

windowSpec  = Window.partitionBy(*primary_key)
 
df.withColumn("primary_key_count",F.count("*").over(windowSpec)).filter(F.col("primary_key_count") > 1).drop("primary_key_count").show(truncate=False)

NhatHoang
Valued Contributor II

Hi,

โ€‹

In my experience, if you use dropDuplicates(), Spark will keep a random row.

โ€‹

Therefore, you should define a logic to remove duplicated rows.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.