cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Performance Issue with MinHash + Approx Similarity Join for Fuzzy Duplicate Detection

dsoat
New Contributor

Hello Community,

We have implemented a fuzzy matching logic in Databricks using the MinHash algorithm along with the approxSimilarityJoin API to identify duplicate records in a large dataset. While the logic is working correctly, we are facing a significant performance bottleneck — the execution time is much longer than expected, especially as the dataset size grows.

Details:

  • Purpose: Detect duplicates based on approximate string matching.

  • Approach:

    1. Tokenize and hash the strings.

    2. Apply MinHash for similarity signatures.

    3. Use approxSimilarityJoin to compare records.

  • Issue: The job is running extremely slow on datasets with millions of records.

Questions:

  1. Are there any known performance tuning strategies or optimizations for MinHash and approxSimilarityJoin in Spark/Databricks?

  2. Would alternative approaches (e.g., LSH variants, embeddings with ANN search) offer better scalability?

  3. Any configuration or partitioning recommendations to improve execution speed?

Any guidance, best practices, or code optimization tips would be greatly appreciated.

Thank you,
Bellow is the source code attached.

# Imports and Spark Session (same)
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.storagelevel import StorageLevel
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, HashingTF, MinHashLSH
from pyspark.sql import functions as F
# Spark session
spark = SparkSession.builder.appName("FuzzyDuplicateDetection").getOrCreate()
# Parameters — adjust as needed
SIM_DISTANCE    = 0.2        # Jaccard distance threshold for LSH (controls candidates)
POST_FILTER_SIM = 0.95       # Final similarity threshold (strict filtering)
SOURCE_TABLE    = 'salesforce_dev.account'
# --- Corrected and Optimized Code ---
# Step :one: Read input data and preprocess
df_raw = deduplication_input
# df_raw.display()
pattern = r"(?i)\btest\b"
df_raw = df_raw.filter(~col("name").rlike(pattern))
df_raw = df_raw.na.drop(subset=["name","kyb_addressline__c"])
print(df_raw.count())
# Step :two: Clean and prepare entity fields
df_clean = (
    df_raw.fillna({'name':'', 'kyb_addressline__c':''})
          .withColumn('name_c',  F.lower(F.regexp_replace('name', r'[^\p{L}\p{N}\s]', '')))
          .withColumn('addr_c',  F.lower(F.regexp_replace('kyb_addressline__c',r'[^\p{L}\p{N}\s]', '')))
          .withColumn('entity',  F.concat_ws(' ', 'name_c', 'addr_c'))
)
# Step :three: Blocking key generation (retained for potential future advanced blocking if needed, but not used in current LSH join)
df_processed = df_clean.withColumn(
    'block_key',
    F.sha2(F.concat_ws(' ', 'name_c', 'addr_c'), 256).substr(1, 4) # First 4 chars of SHA2 hash
)
# Step :four: Build LSH pipeline with improved parameters
tokenizer   = RegexTokenizer(inputCol="entity", outputCol="tokens", pattern="[^\\p{L}\\p{N}]+")
hashing_tf  = HashingTF(inputCol="tokens", outputCol="features", numFeatures=20000) # Increased features
minhash     = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=10) # Increased hash tables
hl_pipeline = Pipeline(stages=[tokenizer, hashing_tf, minhash])
# Step :five: Fit and transform the entire dataset once, with token size filter
pipeline_model = hl_pipeline.fit(df_processed)
df_vect = pipeline_model.transform(df_processed) \
                      .filter(size(col('tokens')) > 0) \
                      .filter(size(col('tokens')) <= 100) \
                      .persist(StorageLevel.MEMORY_AND_DISK)
# df_vect.select("name","name_c","entity", "tokens").display()
# Get the trained LSH model from the fitted pipeline
minhash_model = pipeline_model.stages[-1]
# Step :six: Perform the distributed similarity join (NO LOOP HERE)
# The approxSimilarityJoin is a distributed operation designed for this.
matches = (
    minhash_model.approxSimilarityJoin(df_vect, df_vect, SIM_DISTANCE, distCol='jaccardDist')
                 .filter('datasetA.id < datasetB.id') # Avoid self-joins + duplicates
                 .filter(f'1 - jaccardDist > {POST_FILTER_SIM}') # Final strict filtering
                 .selectExpr(
                     'datasetA.id as id_a',
                     'datasetA.name as name_a',
                     'datasetA.name_c as name_c_a',
                     'datasetA.kyb_addressline__c as address_a',
                     'datasetB.id as id_b',
                     'datasetB.name as name_b',
                     'datasetB.name_c as name_c_b',
                     'datasetB.kyb_addressline__c as address_b',
                     '1 - jaccardDist as similarity'
                 )
)
# Step :seven: Final deduplication and persist
dupes_all = matches.dropDuplicates(['id_a', 'id_b']) \
                   .persist(StorageLevel.MEMORY_AND_DISK)
dupes_all = matches.withColumn("min_id", least("id_a", "id_b")) \
             .withColumn("max_id", greatest("id_a", "id_b")) \
             .dropDuplicates(["min_id", "max_id"]) \
             .drop("min_id", "max_id")
# # Show result
# dupes_all.display()
# Step :eight: Unpersist intermediate DF
df_vect.unpersist()
df = dupes_all
df.display()

 

1 REPLY 1

BigRoux
Databricks Employee
Databricks Employee

Hey @dsoat, out of curiosity:

1. Expectations

You mentioned it’s “taking longer than expected.” What were your expectations, and what are you comparing them against?

2. Data

• What is the total size of the dataset after initial filtering (in GB or rows)?

• Are there any unusual data skews or highly frequent tokens/values in the matching columns?

• What is the estimated cardinality of the blocking keys or generated tokens?

• How many columns are involved in the LSH and join operations, and what are their data types?

• How many unique records participate in the self-join phase?

• Are there known outliers or unusually large input files that could create unbalanced partitions?

3. Cluster Configuration

• How many nodes are in the cluster, and what are their specs (memory, vCPUs per node)?

• How many executors, executor cores, and how much executor memory is allocated?

• What value is set for spark.sql.shuffle.partitions, and has it been tuned for your workload?

• Is dynamic resource allocation enabled (spark.dynamicAllocation.enabled), or is allocation static?

• Have you observed executor/driver OOM errors, excessive GC, or shuffle spill/writes in the Spark UI?

• Is Adaptive Query Execution (AQE) enabled in the Databricks workspace?

4. Environment

• What Spark and Databricks Runtime versions are you using?

• Are you working with Delta Lake tables, and if so, are they optimized/compacted?

• Is this workload running as a scheduled job or as ad-hoc analysis?

 

Let me know.

Cheers, Louis.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now