cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Get Started Discussions
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Fuzzy Match on PySpark using UDF/Pandas UDF

mohaimen_syed
New Contributor III

I'm trying to do fuzzy matching on two dataframes by cross joining them and then using a udf for my fuzzy matching. But using both python udf and pandas udf its either very slow or I get an error.

 

@pandas_udf("int")
def core_match_processor(s1: pd.Series, s2: pd.Series) -> pd.Series:
return pd.Series(int(rapidfuzz.ratio(s1, s2)))

MatchUDF = f.pandas_udf(core_match_processor, returnType=IntegerType())
 
df0 = df1.crossJoin(broadcast(df2))
df = df0.withColumn("Score", MatchUDF(f.col("String1"), f.col("String2")))
 
Error: org.apache.spark.SparkRuntimeException: [UDF_USER_CODE_ERROR.GENERIC] Execution of function core_match_processor
5 REPLIES 5

Kaniz
Community Manager
Community Manager

Hi @mohaimen_syed, One approach to improving the performance of your fuzzy matching UDF is to use PySpark's built-in String similarity functions, such as levenshtein, soundex, or metaphone. These functions are optimized for distributed processing and can be used directly on PySpark DataFrames without the need for UDFs.

Check out this article for reference:- https://mrpowers.medium.com/fuzzy-matching-in-spark-with-soundex-and-levenshtein-distance-6749f5af8f...

mohaimen_syed
New Contributor III

I'm now getting the error: (SQL_GROUPED_AGG_PANDAS_UDF) is not supported on clusters in Shared access mode.
Even though this article clearly states that pandas udf is supported for shared cluster in databricks

https://www.databricks.com/blog/shared-clusters-unity-catalog-win-introducing-cluster-libraries-pyth...

Kaniz
Community Manager
Community Manager

Hi @mohaimen_syed, Could you please help me with these details:- 

- Cluster details, and

- Check if Apache Apache Arrow optimization is enabled in your cluster.

mohaimen_syed
New Contributor III

Cluster:
Policy: Shared Compute

Access: Shared

Runtime: 14.1 (includes Apache Spark 3.5.0, Scala 2.12)

Worker type: Standard_L8s_v3 (64 GB Memory, 8 Cores) - workers- 1-60
Driver type: Standard_L8s_v3 (64 GB Memory, 8 Cores)

I added this line in my python notebook: 

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") which I believe will enable Apache Apache Arrow optimization.

Any updates here? I'm running into the same problem with serverless compute