Hello,
I am currently evaluating using Microsoft's Presidio de-identification libraries for my organization and would like to see if we can take advantage to Sparks broadcast capabilities, but I am getting an error message:
"[BROADCAST_VARIABLE_NOT_LOADED] Broadcast variable `2872` not loaded."
According the Databricks Cluster KB, using broadcast on a shared cluster is not possible (Cluster KB) and the solution is to "Use a single-user cluster or pass a variable into a function as a state instead". I am not able to do a single-user cluster and my organization does not currently allow that and I am frankly confused by "pass a variable into a function as a state instead" and how to do that in Databricks Spark.
If someone could provide some guidance on the State variable into a function that would be greatly appreciated.
Below is the code that I am trying to run that gives me the error.
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import StringType
anonymized_column = "note_text" # name of column to anonymize
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()
broadcasted_analyzer = sc.broadcast(analyzer)
broadcasted_anonymizer = sc.broadcast(anonymizer)
# broadcast the engines to the cluster nodes
spark.conf.set("spark.databricks.broadcastTimeout", "600s")
# define a pandas UDF function and a series function over it.
def anonymize_text(text: str) -> str:
analyzer = broadcasted_analyzer.value
anonymizer = broadcasted_anonymizer.value
analyzer_results = analyzer.analyze(text=text, language="en")
anonymized_results = anonymizer.anonymize(
text=text,
analyzer_results=analyzer_results,
operators={
"DEFAULT": OperatorConfig("replace", {"new_value": "<ANONYMIZED>"})
},
)
return anonymized_results.text
def anonymize_series(s: pd.Series) -> pd.Series:
return s.apply(anonymize_text)
# define a the function as pandas UDF
anonymize = pandas_udf(anonymize_series, returnType=StringType())
# convert Pandas DataFrame to Spark DataFrame
spark_report_df = spark.createDataFrame(report_df)
# apply the udf
anonymized_df = spark_report_df.withColumn(
anonymized_column, anonymize(col(anonymized_column))
)
display(anonymized_df)