cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to get all occurrences of duplicate records in a PySpark DataFrame based on specific columns?

Mado
Valued Contributor II

Hi,

I need to find all occurrences of duplicate records in a PySpark DataFrame. Following is the sample dataset:

# Prepare Data
data = [("A", "A", 1), \
    ("A", "A", 2), \
    ("A", "A", 3), \
    ("A", "B", 4), \
    ("A", "B", 5), \
    ("A", "C", 6), \
    ("A", "D", 7), \
    ("A", "E", 8), \
  ]
 
# Create DataFrame
columns= ["col_1", "col_2", "col_3"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

image 

When I try the following code:

primary_key = ['col_1', 'col_2']
 
duplicate_records = df.exceptAll(df.dropDuplicates(primary_key))

The output will be:

image 

As you can see, I don't get all occurrences of duplicate records based on the Primary Key, since one instance of duplicate records is present in "df.dropDuplicates(primary_key)". The 1st and the 4th records of the dataset must be in the output. 

Any idea to solve this issue?

1 ACCEPTED SOLUTION

Accepted Solutions

daniel_sahal
Esteemed Contributor

Hi,

Getting the not duplicated records and doing 'left_anti' join should do the trick.

not_duplicate_records = df.groupBy(primary_key).count().where('count = 1').drop('count')
 
duplicate_records = df.join(not_duplicate_records, on=primary_key, how='left_anti').show()

image

View solution in original post

3 REPLIES 3

daniel_sahal
Esteemed Contributor

Hi,

Getting the not duplicated records and doing 'left_anti' join should do the trick.

not_duplicate_records = df.groupBy(primary_key).count().where('count = 1').drop('count')
 
duplicate_records = df.join(not_duplicate_records, on=primary_key, how='left_anti').show()

image

Shalabh007
Honored Contributor

@Mohammad Saberโ€‹ how about using window function like below

windowSpec  = Window.partitionBy(*primary_key)
 
df.withColumn("primary_key_count",F.count("*").over(windowSpec)).filter(F.col("primary_key_count") > 1).drop("primary_key_count").show(truncate=False)

NhatHoang
Valued Contributor II

Hi,

โ€‹

In my experience, if you use dropDuplicates(), Spark will keep a random row.

โ€‹

Therefore, you should define a logic to remove duplicated rows.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now