Hello Community,
We have implemented a fuzzy matching logic in Databricks using the MinHash algorithm along with the approxSimilarityJoin API to identify duplicate records in a large dataset. While the logic is working correctly, we are facing a significant performance bottleneck — the execution time is much longer than expected, especially as the dataset size grows.
Details:
Questions:
Are there any known performance tuning strategies or optimizations for MinHash and approxSimilarityJoin in Spark/Databricks?
Would alternative approaches (e.g., LSH variants, embeddings with ANN search) offer better scalability?
Any configuration or partitioning recommendations to improve execution speed?
Any guidance, best practices, or code optimization tips would be greatly appreciated.
Thank you,
Bellow is the source code attached.
# Imports and Spark Session (same)
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.storagelevel import StorageLevel
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, HashingTF, MinHashLSH
from pyspark.sql import functions as F
# Spark session
spark = SparkSession.builder.appName("FuzzyDuplicateDetection").getOrCreate()
# Parameters — adjust as needed
SIM_DISTANCE = 0.2 # Jaccard distance threshold for LSH (controls candidates)
POST_FILTER_SIM = 0.95 # Final similarity threshold (strict filtering)
SOURCE_TABLE = 'salesforce_dev.account'
# --- Corrected and Optimized Code ---
# Step :one: Read input data and preprocess
df_raw = deduplication_input
# df_raw.display()
pattern = r"(?i)\btest\b"
df_raw = df_raw.filter(~col("name").rlike(pattern))
df_raw = df_raw.na.drop(subset=["name","kyb_addressline__c"])
print(df_raw.count())
# Step :two: Clean and prepare entity fields
df_clean = (
df_raw.fillna({'name':'', 'kyb_addressline__c':''})
.withColumn('name_c', F.lower(F.regexp_replace('name', r'[^\p{L}\p{N}\s]', '')))
.withColumn('addr_c', F.lower(F.regexp_replace('kyb_addressline__c',r'[^\p{L}\p{N}\s]', '')))
.withColumn('entity', F.concat_ws(' ', 'name_c', 'addr_c'))
)
# Step :three: Blocking key generation (retained for potential future advanced blocking if needed, but not used in current LSH join)
df_processed = df_clean.withColumn(
'block_key',
F.sha2(F.concat_ws(' ', 'name_c', 'addr_c'), 256).substr(1, 4) # First 4 chars of SHA2 hash
)
# Step :four: Build LSH pipeline with improved parameters
tokenizer = RegexTokenizer(inputCol="entity", outputCol="tokens", pattern="[^\\p{L}\\p{N}]+")
hashing_tf = HashingTF(inputCol="tokens", outputCol="features", numFeatures=20000) # Increased features
minhash = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=10) # Increased hash tables
hl_pipeline = Pipeline(stages=[tokenizer, hashing_tf, minhash])
# Step :five: Fit and transform the entire dataset once, with token size filter
pipeline_model = hl_pipeline.fit(df_processed)
df_vect = pipeline_model.transform(df_processed) \
.filter(size(col('tokens')) > 0) \
.filter(size(col('tokens')) <= 100) \
.persist(StorageLevel.MEMORY_AND_DISK)
# df_vect.select("name","name_c","entity", "tokens").display()
# Get the trained LSH model from the fitted pipeline
minhash_model = pipeline_model.stages[-1]
# Step :six: Perform the distributed similarity join (NO LOOP HERE)
# The approxSimilarityJoin is a distributed operation designed for this.
matches = (
minhash_model.approxSimilarityJoin(df_vect, df_vect, SIM_DISTANCE, distCol='jaccardDist')
.filter('datasetA.id < datasetB.id') # Avoid self-joins + duplicates
.filter(f'1 - jaccardDist > {POST_FILTER_SIM}') # Final strict filtering
.selectExpr(
'datasetA.id as id_a',
'datasetA.name as name_a',
'datasetA.name_c as name_c_a',
'datasetA.kyb_addressline__c as address_a',
'datasetB.id as id_b',
'datasetB.name as name_b',
'datasetB.name_c as name_c_b',
'datasetB.kyb_addressline__c as address_b',
'1 - jaccardDist as similarity'
)
)
# Step :seven: Final deduplication and persist
dupes_all = matches.dropDuplicates(['id_a', 'id_b']) \
.persist(StorageLevel.MEMORY_AND_DISK)
dupes_all = matches.withColumn("min_id", least("id_a", "id_b")) \
.withColumn("max_id", greatest("id_a", "id_b")) \
.dropDuplicates(["min_id", "max_id"]) \
.drop("min_id", "max_id")
# # Show result
# dupes_all.display()
# Step :eight: Unpersist intermediate DF
df_vect.unpersist()
df = dupes_all
df.display()