cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
cancel
Showing results for 
Search instead for 
Did you mean: 

Fuzzy Match on PySpark using UDF/Pandas UDF

mohaimen_syed
New Contributor III

I'm trying to do fuzzy matching on two dataframes by cross joining them and then using a udf for my fuzzy matching. But using both python udf and pandas udf its either very slow or I get an error.

 

@pandas_udf("int")
def core_match_processor(s1: pd.Series, s2: pd.Series) -> pd.Series:
return pd.Series(int(rapidfuzz.ratio(s1, s2)))

MatchUDF = f.pandas_udf(core_match_processor, returnType=IntegerType())
 
df0 = df1.crossJoin(broadcast(df2))
df = df0.withColumn("Score", MatchUDF(f.col("String1"), f.col("String2")))
 
Error: org.apache.spark.SparkRuntimeException: [UDF_USER_CODE_ERROR.GENERIC] Execution of function core_match_processor
5 REPLIES 5

Kaniz
Community Manager
Community Manager

Hi @mohaimen_syed, One approach to improving the performance of your fuzzy matching UDF is to use PySpark's built-in String similarity functions, such as levenshtein, soundex, or metaphone. These functions are optimized for distributed processing and can be used directly on PySpark DataFrames without the need for UDFs.

Check out this article for reference:- https://mrpowers.medium.com/fuzzy-matching-in-spark-with-soundex-and-levenshtein-distance-6749f5af8f...

mohaimen_syed
New Contributor III

I'm now getting the error: (SQL_GROUPED_AGG_PANDAS_UDF) is not supported on clusters in Shared access mode.
Even though this article clearly states that pandas udf is supported for shared cluster in databricks

https://www.databricks.com/blog/shared-clusters-unity-catalog-win-introducing-cluster-libraries-pyth...

Kaniz
Community Manager
Community Manager

Hi @mohaimen_syed, Could you please help me with these details:- 

- Cluster details, and

- Check if Apache Apache Arrow optimization is enabled in your cluster.

mohaimen_syed
New Contributor III

Cluster:
Policy: Shared Compute

Access: Shared

Runtime: 14.1 (includes Apache Spark 3.5.0, Scala 2.12)

Worker type: Standard_L8s_v3 (64 GB Memory, 8 Cores) - workers- 1-60
Driver type: Standard_L8s_v3 (64 GB Memory, 8 Cores)

I added this line in my python notebook: 

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") which I believe will enable Apache Apache Arrow optimization.

Any updates here? I'm running into the same problem with serverless compute

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.