<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Performance Issue with MinHash + Approx Similarity Join for Fuzzy Duplicate Detection in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/performance-issue-with-minhash-approx-similarity-join-for-fuzzy/m-p/127781#M48081</link>
    <description>&lt;P&gt;Hello Community,&lt;/P&gt;&lt;P&gt;We have implemented a fuzzy matching logic in Databricks using the &lt;STRONG&gt;MinHash&lt;/STRONG&gt; algorithm along with the &lt;STRONG&gt;approxSimilarityJoin&lt;/STRONG&gt; API to identify duplicate records in a large dataset. While the logic is working correctly, we are facing a significant &lt;STRONG&gt;performance bottleneck&lt;/STRONG&gt; — the execution time is much longer than expected, especially as the dataset size grows.&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Details:&lt;/STRONG&gt;&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;Purpose: Detect duplicates based on approximate string matching.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Approach:&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;&lt;P&gt;Tokenize and hash the strings.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Apply MinHash for similarity signatures.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Use approxSimilarityJoin to compare records.&lt;/P&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Issue: The job is running extremely slow on datasets with millions of records.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;&lt;STRONG&gt;Questions:&lt;/STRONG&gt;&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;&lt;P&gt;Are there any known performance tuning strategies or optimizations for MinHash and approxSimilarityJoin in Spark/Databricks?&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Would alternative approaches (e.g., LSH variants, embeddings with ANN search) offer better scalability?&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Any configuration or partitioning recommendations to improve execution speed?&lt;/P&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;Any guidance, best practices, or code optimization tips would be greatly appreciated.&lt;/P&gt;&lt;P&gt;Thank you,&lt;BR /&gt;Bellow is the source code attached.&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;# Imports and Spark Session (same)
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.storagelevel import StorageLevel
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, HashingTF, MinHashLSH
from pyspark.sql import functions as F
# Spark session
spark = SparkSession.builder.appName("FuzzyDuplicateDetection").getOrCreate()
# Parameters — adjust as needed
SIM_DISTANCE    = 0.2        # Jaccard distance threshold for LSH (controls candidates)
POST_FILTER_SIM = 0.95       # Final similarity threshold (strict filtering)
SOURCE_TABLE    = 'salesforce_dev.account'
# --- Corrected and Optimized Code ---
# Step :one: Read input data and preprocess
df_raw = deduplication_input
# df_raw.display()
pattern = r"(?i)\btest\b"
df_raw = df_raw.filter(~col("name").rlike(pattern))
df_raw = df_raw.na.drop(subset=["name","kyb_addressline__c"])
print(df_raw.count())
# Step :two: Clean and prepare entity fields
df_clean = (
    df_raw.fillna({'name':'', 'kyb_addressline__c':''})
          .withColumn('name_c',  F.lower(F.regexp_replace('name', r'[^\p{L}\p{N}\s]', '')))
          .withColumn('addr_c',  F.lower(F.regexp_replace('kyb_addressline__c',r'[^\p{L}\p{N}\s]', '')))
          .withColumn('entity',  F.concat_ws(' ', 'name_c', 'addr_c'))
)
# Step :three: Blocking key generation (retained for potential future advanced blocking if needed, but not used in current LSH join)
df_processed = df_clean.withColumn(
    'block_key',
    F.sha2(F.concat_ws(' ', 'name_c', 'addr_c'), 256).substr(1, 4) # First 4 chars of SHA2 hash
)
# Step :four: Build LSH pipeline with improved parameters
tokenizer   = RegexTokenizer(inputCol="entity", outputCol="tokens", pattern="[^\\p{L}\\p{N}]+")
hashing_tf  = HashingTF(inputCol="tokens", outputCol="features", numFeatures=20000) # Increased features
minhash     = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=10) # Increased hash tables
hl_pipeline = Pipeline(stages=[tokenizer, hashing_tf, minhash])
# Step :five: Fit and transform the entire dataset once, with token size filter
pipeline_model = hl_pipeline.fit(df_processed)
df_vect = pipeline_model.transform(df_processed) \
                      .filter(size(col('tokens')) &amp;gt; 0) \
                      .filter(size(col('tokens')) &amp;lt;= 100) \
                      .persist(StorageLevel.MEMORY_AND_DISK)
# df_vect.select("name","name_c","entity", "tokens").display()
# Get the trained LSH model from the fitted pipeline
minhash_model = pipeline_model.stages[-1]
# Step :six: Perform the distributed similarity join (NO LOOP HERE)
# The approxSimilarityJoin is a distributed operation designed for this.
matches = (
    minhash_model.approxSimilarityJoin(df_vect, df_vect, SIM_DISTANCE, distCol='jaccardDist')
                 .filter('datasetA.id &amp;lt; datasetB.id') # Avoid self-joins + duplicates
                 .filter(f'1 - jaccardDist &amp;gt; {POST_FILTER_SIM}') # Final strict filtering
                 .selectExpr(
                     'datasetA.id as id_a',
                     'datasetA.name as name_a',
                     'datasetA.name_c as name_c_a',
                     'datasetA.kyb_addressline__c as address_a',
                     'datasetB.id as id_b',
                     'datasetB.name as name_b',
                     'datasetB.name_c as name_c_b',
                     'datasetB.kyb_addressline__c as address_b',
                     '1 - jaccardDist as similarity'
                 )
)
# Step :seven: Final deduplication and persist
dupes_all = matches.dropDuplicates(['id_a', 'id_b']) \
                   .persist(StorageLevel.MEMORY_AND_DISK)
dupes_all = matches.withColumn("min_id", least("id_a", "id_b")) \
             .withColumn("max_id", greatest("id_a", "id_b")) \
             .dropDuplicates(["min_id", "max_id"]) \
             .drop("min_id", "max_id")
# # Show result
# dupes_all.display()
# Step :eight: Unpersist intermediate DF
df_vect.unpersist()
df = dupes_all
df.display()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Fri, 08 Aug 2025 11:02:03 GMT</pubDate>
    <dc:creator>dsoat</dc:creator>
    <dc:date>2025-08-08T11:02:03Z</dc:date>
    <item>
      <title>Performance Issue with MinHash + Approx Similarity Join for Fuzzy Duplicate Detection</title>
      <link>https://community.databricks.com/t5/data-engineering/performance-issue-with-minhash-approx-similarity-join-for-fuzzy/m-p/127781#M48081</link>
      <description>&lt;P&gt;Hello Community,&lt;/P&gt;&lt;P&gt;We have implemented a fuzzy matching logic in Databricks using the &lt;STRONG&gt;MinHash&lt;/STRONG&gt; algorithm along with the &lt;STRONG&gt;approxSimilarityJoin&lt;/STRONG&gt; API to identify duplicate records in a large dataset. While the logic is working correctly, we are facing a significant &lt;STRONG&gt;performance bottleneck&lt;/STRONG&gt; — the execution time is much longer than expected, especially as the dataset size grows.&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Details:&lt;/STRONG&gt;&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;Purpose: Detect duplicates based on approximate string matching.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Approach:&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;&lt;P&gt;Tokenize and hash the strings.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Apply MinHash for similarity signatures.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Use approxSimilarityJoin to compare records.&lt;/P&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Issue: The job is running extremely slow on datasets with millions of records.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;&lt;STRONG&gt;Questions:&lt;/STRONG&gt;&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;&lt;P&gt;Are there any known performance tuning strategies or optimizations for MinHash and approxSimilarityJoin in Spark/Databricks?&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Would alternative approaches (e.g., LSH variants, embeddings with ANN search) offer better scalability?&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Any configuration or partitioning recommendations to improve execution speed?&lt;/P&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;Any guidance, best practices, or code optimization tips would be greatly appreciated.&lt;/P&gt;&lt;P&gt;Thank you,&lt;BR /&gt;Bellow is the source code attached.&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;# Imports and Spark Session (same)
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.storagelevel import StorageLevel
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, HashingTF, MinHashLSH
from pyspark.sql import functions as F
# Spark session
spark = SparkSession.builder.appName("FuzzyDuplicateDetection").getOrCreate()
# Parameters — adjust as needed
SIM_DISTANCE    = 0.2        # Jaccard distance threshold for LSH (controls candidates)
POST_FILTER_SIM = 0.95       # Final similarity threshold (strict filtering)
SOURCE_TABLE    = 'salesforce_dev.account'
# --- Corrected and Optimized Code ---
# Step :one: Read input data and preprocess
df_raw = deduplication_input
# df_raw.display()
pattern = r"(?i)\btest\b"
df_raw = df_raw.filter(~col("name").rlike(pattern))
df_raw = df_raw.na.drop(subset=["name","kyb_addressline__c"])
print(df_raw.count())
# Step :two: Clean and prepare entity fields
df_clean = (
    df_raw.fillna({'name':'', 'kyb_addressline__c':''})
          .withColumn('name_c',  F.lower(F.regexp_replace('name', r'[^\p{L}\p{N}\s]', '')))
          .withColumn('addr_c',  F.lower(F.regexp_replace('kyb_addressline__c',r'[^\p{L}\p{N}\s]', '')))
          .withColumn('entity',  F.concat_ws(' ', 'name_c', 'addr_c'))
)
# Step :three: Blocking key generation (retained for potential future advanced blocking if needed, but not used in current LSH join)
df_processed = df_clean.withColumn(
    'block_key',
    F.sha2(F.concat_ws(' ', 'name_c', 'addr_c'), 256).substr(1, 4) # First 4 chars of SHA2 hash
)
# Step :four: Build LSH pipeline with improved parameters
tokenizer   = RegexTokenizer(inputCol="entity", outputCol="tokens", pattern="[^\\p{L}\\p{N}]+")
hashing_tf  = HashingTF(inputCol="tokens", outputCol="features", numFeatures=20000) # Increased features
minhash     = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=10) # Increased hash tables
hl_pipeline = Pipeline(stages=[tokenizer, hashing_tf, minhash])
# Step :five: Fit and transform the entire dataset once, with token size filter
pipeline_model = hl_pipeline.fit(df_processed)
df_vect = pipeline_model.transform(df_processed) \
                      .filter(size(col('tokens')) &amp;gt; 0) \
                      .filter(size(col('tokens')) &amp;lt;= 100) \
                      .persist(StorageLevel.MEMORY_AND_DISK)
# df_vect.select("name","name_c","entity", "tokens").display()
# Get the trained LSH model from the fitted pipeline
minhash_model = pipeline_model.stages[-1]
# Step :six: Perform the distributed similarity join (NO LOOP HERE)
# The approxSimilarityJoin is a distributed operation designed for this.
matches = (
    minhash_model.approxSimilarityJoin(df_vect, df_vect, SIM_DISTANCE, distCol='jaccardDist')
                 .filter('datasetA.id &amp;lt; datasetB.id') # Avoid self-joins + duplicates
                 .filter(f'1 - jaccardDist &amp;gt; {POST_FILTER_SIM}') # Final strict filtering
                 .selectExpr(
                     'datasetA.id as id_a',
                     'datasetA.name as name_a',
                     'datasetA.name_c as name_c_a',
                     'datasetA.kyb_addressline__c as address_a',
                     'datasetB.id as id_b',
                     'datasetB.name as name_b',
                     'datasetB.name_c as name_c_b',
                     'datasetB.kyb_addressline__c as address_b',
                     '1 - jaccardDist as similarity'
                 )
)
# Step :seven: Final deduplication and persist
dupes_all = matches.dropDuplicates(['id_a', 'id_b']) \
                   .persist(StorageLevel.MEMORY_AND_DISK)
dupes_all = matches.withColumn("min_id", least("id_a", "id_b")) \
             .withColumn("max_id", greatest("id_a", "id_b")) \
             .dropDuplicates(["min_id", "max_id"]) \
             .drop("min_id", "max_id")
# # Show result
# dupes_all.display()
# Step :eight: Unpersist intermediate DF
df_vect.unpersist()
df = dupes_all
df.display()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Fri, 08 Aug 2025 11:02:03 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/performance-issue-with-minhash-approx-similarity-join-for-fuzzy/m-p/127781#M48081</guid>
      <dc:creator>dsoat</dc:creator>
      <dc:date>2025-08-08T11:02:03Z</dc:date>
    </item>
    <item>
      <title>Re: Performance Issue with MinHash + Approx Similarity Join for Fuzzy Duplicate Detection</title>
      <link>https://community.databricks.com/t5/data-engineering/performance-issue-with-minhash-approx-similarity-join-for-fuzzy/m-p/132464#M49500</link>
      <description>&lt;P class="p1"&gt;Hey &lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/178463"&gt;@dsoat&lt;/a&gt;, out of curiosity:&lt;/P&gt;
&lt;P class="p3"&gt;&lt;STRONG&gt;1. Expectations&lt;/STRONG&gt;&lt;/P&gt;
&lt;P class="p1"&gt;You mentioned it’s “taking longer than expected.” What were your expectations, and what are you comparing them against?&lt;/P&gt;
&lt;P class="p3"&gt;&lt;STRONG&gt;2. Data&lt;/STRONG&gt;&lt;/P&gt;
&lt;P class="p1"&gt;• What is the total size of the dataset after initial filtering (in GB or rows)?&lt;/P&gt;
&lt;P class="p1"&gt;• Are there any unusual data skews or highly frequent tokens/values in the matching columns?&lt;/P&gt;
&lt;P class="p1"&gt;• What is the estimated cardinality of the blocking keys or generated tokens?&lt;/P&gt;
&lt;P class="p1"&gt;• How many columns are involved in the LSH and join operations, and what are their data types?&lt;/P&gt;
&lt;P class="p1"&gt;• How many unique records participate in the self-join phase?&lt;/P&gt;
&lt;P class="p1"&gt;• Are there known outliers or unusually large input files that could create unbalanced partitions?&lt;/P&gt;
&lt;P class="p3"&gt;&lt;STRONG&gt;3. Cluster Configuration&lt;/STRONG&gt;&lt;/P&gt;
&lt;P class="p1"&gt;• How many nodes are in the cluster, and what are their specs (memory, vCPUs per node)?&lt;/P&gt;
&lt;P class="p1"&gt;• How many executors, executor cores, and how much executor memory is allocated?&lt;/P&gt;
&lt;P class="p1"&gt;• What value is set for &lt;SPAN class="s1"&gt;spark.sql.shuffle.partitions&lt;/SPAN&gt;, and has it been tuned for your workload?&lt;/P&gt;
&lt;P class="p1"&gt;• Is dynamic resource allocation enabled (&lt;SPAN class="s1"&gt;spark.dynamicAllocation.enabled&lt;/SPAN&gt;), or is allocation static?&lt;/P&gt;
&lt;P class="p1"&gt;• Have you observed executor/driver OOM errors, excessive GC, or shuffle spill/writes in the Spark UI?&lt;/P&gt;
&lt;P class="p1"&gt;• Is Adaptive Query Execution (AQE) enabled in the Databricks workspace?&lt;/P&gt;
&lt;P class="p3"&gt;&lt;STRONG&gt;4. Environment&lt;/STRONG&gt;&lt;/P&gt;
&lt;P class="p1"&gt;• What Spark and Databricks Runtime versions are you using?&lt;/P&gt;
&lt;P class="p1"&gt;• Are you working with Delta Lake tables, and if so, are they optimized/compacted?&lt;/P&gt;
&lt;P class="p1"&gt;• Is this workload running as a scheduled job or as ad-hoc analysis?&lt;/P&gt;
&lt;P class="p1"&gt;&amp;nbsp;&lt;/P&gt;
&lt;P class="p1"&gt;Let me know.&lt;/P&gt;
&lt;P class="p1"&gt;Cheers, Louis.&lt;/P&gt;</description>
      <pubDate>Thu, 18 Sep 2025 15:17:07 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/performance-issue-with-minhash-approx-similarity-join-for-fuzzy/m-p/132464#M49500</guid>
      <dc:creator>Louis_Frolio</dc:creator>
      <dc:date>2025-09-18T15:17:07Z</dc:date>
    </item>
    <item>
      <title>Re: Performance Issue with MinHash + Approx Similarity Join for Fuzzy Duplicate Detection</title>
      <link>https://community.databricks.com/t5/data-engineering/performance-issue-with-minhash-approx-similarity-join-for-fuzzy/m-p/149754#M53171</link>
      <description>&lt;P&gt;On a dataset with millions of rows, approxSimilarityJoin(df, df, …) can become slow because it has to build a large list of candidate pairs (rows that might match) before it can score and filter them.&lt;/P&gt;&lt;P&gt;Candidate explosion means your settings produce too many “maybe” pairs. Even if you later keep only matches &amp;gt; 0.95, Spark already paid the cost to generate and move those candidates around.&lt;/P&gt;&lt;P&gt;Shuffle is the network-heavy step where Spark moves data across the cluster to group rows that land in the same LSH buckets so they can be compared.&lt;/P&gt;&lt;P&gt;Skew is when some buckets end up much larger than others (often caused by very common tokens like “ltd”, “street”, etc.). A few overloaded tasks then dominate total runtime.&lt;/P&gt;&lt;P&gt;If you keep it in Spark, the usual levers are:&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;&lt;P&gt;Block first to reduce who can be compared, then run matching within each block.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Make the LSH distance threshold line up with your final threshold (don’t generate candidates far below what you’ll accept).&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Remove junk inputs and very common tokens (short/placeholder strings, boilerplate words) to reduce collisions.&lt;/P&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;&lt;STRONG&gt;If you’d rather avoid maintaining and tuning the LSH pipeline, &lt;/STRONG&gt;look into Similarity API -&amp;nbsp; It’s a hosted “dedupe within one dataset” service: you send the list once and it returns duplicate pairs/clusters without running an all-to-all join in your Databricks job. It’s optimized for large datasets and does the blocking/candidate generation and scoring for you, returning duplicate pairs/clusters. Preprocessing is optional but usually helpful (e.g., lowercasing, punctuation removal, token sorting), especially when names/addresses have formatting noise.&lt;/P&gt;&lt;P&gt;Most if not all of the above could be substituted with something like this:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;import os, requests

resp = requests.post(
  "https://api.similarity-api.com/dedupe",
  headers={
    "Authorization": f"Bearer {os.environ['SIMILARITY_API_KEY']}",
    "Content-Type": "application/json",
  },
  json={
    "data": ["Microsoft", "Microsft", "Apple Inc.", "appLE",..],
    "config": {
      "similarity_threshold": 0.70,
      "top_n": 5,
      "remove_punctuation": True,
      "to_lowercase": True,
      "use_token_sort": False,
      "output_format": "flat_table",
    },
  }
)
print(resp.json())&lt;/LI-CODE&gt;&lt;P&gt;Here are the full docs:&amp;nbsp;&lt;A href="https://similarity-api.com/documentation" target="_blank" rel="noopener"&gt;https://similarity-api.com/documentation&lt;/A&gt;&lt;BR /&gt;There's an article on how to do it databricks:&amp;nbsp;&lt;A href="https://similarity-api.com/blog/fuzzy-matching-in-databricks-2026" target="_blank" rel="noopener"&gt;https://similarity-api.com/blog/fuzzy-matching-in-databricks-2026&lt;/A&gt;&lt;/P&gt;&lt;P&gt;This will save you cluster time but the service itself is paid - you need to make an account to get a token and then pay after some trial.&amp;nbsp;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 04 Mar 2026 08:58:36 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/performance-issue-with-minhash-approx-similarity-join-for-fuzzy/m-p/149754#M53171</guid>
      <dc:creator>RheaC</dc:creator>
      <dc:date>2026-03-04T08:58:36Z</dc:date>
    </item>
  </channel>
</rss>

