cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Performance Issue with MinHash + Approx Similarity Join for Fuzzy Duplicate Detection

dsoat
New Contributor

Hello Community,

We have implemented a fuzzy matching logic in Databricks using the MinHash algorithm along with the approxSimilarityJoin API to identify duplicate records in a large dataset. While the logic is working correctly, we are facing a significant performance bottleneck — the execution time is much longer than expected, especially as the dataset size grows.

Details:

  • Purpose: Detect duplicates based on approximate string matching.

  • Approach:

    1. Tokenize and hash the strings.

    2. Apply MinHash for similarity signatures.

    3. Use approxSimilarityJoin to compare records.

  • Issue: The job is running extremely slow on datasets with millions of records.

Questions:

  1. Are there any known performance tuning strategies or optimizations for MinHash and approxSimilarityJoin in Spark/Databricks?

  2. Would alternative approaches (e.g., LSH variants, embeddings with ANN search) offer better scalability?

  3. Any configuration or partitioning recommendations to improve execution speed?

Any guidance, best practices, or code optimization tips would be greatly appreciated.

Thank you,
Bellow is the source code attached.

# Imports and Spark Session (same)
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.storagelevel import StorageLevel
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, HashingTF, MinHashLSH
from pyspark.sql import functions as F
# Spark session
spark = SparkSession.builder.appName("FuzzyDuplicateDetection").getOrCreate()
# Parameters — adjust as needed
SIM_DISTANCE    = 0.2        # Jaccard distance threshold for LSH (controls candidates)
POST_FILTER_SIM = 0.95       # Final similarity threshold (strict filtering)
SOURCE_TABLE    = 'salesforce_dev.account'
# --- Corrected and Optimized Code ---
# Step :one: Read input data and preprocess
df_raw = deduplication_input
# df_raw.display()
pattern = r"(?i)\btest\b"
df_raw = df_raw.filter(~col("name").rlike(pattern))
df_raw = df_raw.na.drop(subset=["name","kyb_addressline__c"])
print(df_raw.count())
# Step :two: Clean and prepare entity fields
df_clean = (
    df_raw.fillna({'name':'', 'kyb_addressline__c':''})
          .withColumn('name_c',  F.lower(F.regexp_replace('name', r'[^\p{L}\p{N}\s]', '')))
          .withColumn('addr_c',  F.lower(F.regexp_replace('kyb_addressline__c',r'[^\p{L}\p{N}\s]', '')))
          .withColumn('entity',  F.concat_ws(' ', 'name_c', 'addr_c'))
)
# Step :three: Blocking key generation (retained for potential future advanced blocking if needed, but not used in current LSH join)
df_processed = df_clean.withColumn(
    'block_key',
    F.sha2(F.concat_ws(' ', 'name_c', 'addr_c'), 256).substr(1, 4) # First 4 chars of SHA2 hash
)
# Step :four: Build LSH pipeline with improved parameters
tokenizer   = RegexTokenizer(inputCol="entity", outputCol="tokens", pattern="[^\\p{L}\\p{N}]+")
hashing_tf  = HashingTF(inputCol="tokens", outputCol="features", numFeatures=20000) # Increased features
minhash     = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=10) # Increased hash tables
hl_pipeline = Pipeline(stages=[tokenizer, hashing_tf, minhash])
# Step :five: Fit and transform the entire dataset once, with token size filter
pipeline_model = hl_pipeline.fit(df_processed)
df_vect = pipeline_model.transform(df_processed) \
                      .filter(size(col('tokens')) > 0) \
                      .filter(size(col('tokens')) <= 100) \
                      .persist(StorageLevel.MEMORY_AND_DISK)
# df_vect.select("name","name_c","entity", "tokens").display()
# Get the trained LSH model from the fitted pipeline
minhash_model = pipeline_model.stages[-1]
# Step :six: Perform the distributed similarity join (NO LOOP HERE)
# The approxSimilarityJoin is a distributed operation designed for this.
matches = (
    minhash_model.approxSimilarityJoin(df_vect, df_vect, SIM_DISTANCE, distCol='jaccardDist')
                 .filter('datasetA.id < datasetB.id') # Avoid self-joins + duplicates
                 .filter(f'1 - jaccardDist > {POST_FILTER_SIM}') # Final strict filtering
                 .selectExpr(
                     'datasetA.id as id_a',
                     'datasetA.name as name_a',
                     'datasetA.name_c as name_c_a',
                     'datasetA.kyb_addressline__c as address_a',
                     'datasetB.id as id_b',
                     'datasetB.name as name_b',
                     'datasetB.name_c as name_c_b',
                     'datasetB.kyb_addressline__c as address_b',
                     '1 - jaccardDist as similarity'
                 )
)
# Step :seven: Final deduplication and persist
dupes_all = matches.dropDuplicates(['id_a', 'id_b']) \
                   .persist(StorageLevel.MEMORY_AND_DISK)
dupes_all = matches.withColumn("min_id", least("id_a", "id_b")) \
             .withColumn("max_id", greatest("id_a", "id_b")) \
             .dropDuplicates(["min_id", "max_id"]) \
             .drop("min_id", "max_id")
# # Show result
# dupes_all.display()
# Step :eight: Unpersist intermediate DF
df_vect.unpersist()
df = dupes_all
df.display()

 

2 REPLIES 2

Louis_Frolio
Databricks Employee
Databricks Employee

Hey @dsoat, out of curiosity:

1. Expectations

You mentioned it’s “taking longer than expected.” What were your expectations, and what are you comparing them against?

2. Data

• What is the total size of the dataset after initial filtering (in GB or rows)?

• Are there any unusual data skews or highly frequent tokens/values in the matching columns?

• What is the estimated cardinality of the blocking keys or generated tokens?

• How many columns are involved in the LSH and join operations, and what are their data types?

• How many unique records participate in the self-join phase?

• Are there known outliers or unusually large input files that could create unbalanced partitions?

3. Cluster Configuration

• How many nodes are in the cluster, and what are their specs (memory, vCPUs per node)?

• How many executors, executor cores, and how much executor memory is allocated?

• What value is set for spark.sql.shuffle.partitions, and has it been tuned for your workload?

• Is dynamic resource allocation enabled (spark.dynamicAllocation.enabled), or is allocation static?

• Have you observed executor/driver OOM errors, excessive GC, or shuffle spill/writes in the Spark UI?

• Is Adaptive Query Execution (AQE) enabled in the Databricks workspace?

4. Environment

• What Spark and Databricks Runtime versions are you using?

• Are you working with Delta Lake tables, and if so, are they optimized/compacted?

• Is this workload running as a scheduled job or as ad-hoc analysis?

 

Let me know.

Cheers, Louis.

RheaC
New Contributor II

On a dataset with millions of rows, approxSimilarityJoin(df, df, …) can become slow because it has to build a large list of candidate pairs (rows that might match) before it can score and filter them.

Candidate explosion means your settings produce too many “maybe” pairs. Even if you later keep only matches > 0.95, Spark already paid the cost to generate and move those candidates around.

Shuffle is the network-heavy step where Spark moves data across the cluster to group rows that land in the same LSH buckets so they can be compared.

Skew is when some buckets end up much larger than others (often caused by very common tokens like “ltd”, “street”, etc.). A few overloaded tasks then dominate total runtime.

If you keep it in Spark, the usual levers are:

  1. Block first to reduce who can be compared, then run matching within each block.

  2. Make the LSH distance threshold line up with your final threshold (don’t generate candidates far below what you’ll accept).

  3. Remove junk inputs and very common tokens (short/placeholder strings, boilerplate words) to reduce collisions.

If you’d rather avoid maintaining and tuning the LSH pipeline, look into Similarity API -  It’s a hosted “dedupe within one dataset” service: you send the list once and it returns duplicate pairs/clusters without running an all-to-all join in your Databricks job. It’s optimized for large datasets and does the blocking/candidate generation and scoring for you, returning duplicate pairs/clusters. Preprocessing is optional but usually helpful (e.g., lowercasing, punctuation removal, token sorting), especially when names/addresses have formatting noise.

Most if not all of the above could be substituted with something like this:

import os, requests

resp = requests.post(
  "https://api.similarity-api.com/dedupe",
  headers={
    "Authorization": f"Bearer {os.environ['SIMILARITY_API_KEY']}",
    "Content-Type": "application/json",
  },
  json={
    "data": ["Microsoft", "Microsft", "Apple Inc.", "appLE",..],
    "config": {
      "similarity_threshold": 0.70,
      "top_n": 5,
      "remove_punctuation": True,
      "to_lowercase": True,
      "use_token_sort": False,
      "output_format": "flat_table",
    },
  }
)
print(resp.json())

Here are the full docs: https://similarity-api.com/documentation
There's an article on how to do it databricks: https://similarity-api.com/blog/fuzzy-matching-in-databricks-2026

This will save you cluster time but the service itself is paid - you need to make an account to get a token and then pay after some trial.