mark_ott
Databricks Employee
Databricks Employee

You’re encountering the [BROADCAST_VARIABLE_NOT_LOADED] error because Databricks in shared access mode cannot use broadcast variables with non-serializable Python objects (such as your Presidio engines) due to cluster architecture limitations. The cluster KB is correct: you must use a different approach—namely, “pass a variable into a function as state instead.”​

Solutions for Passing Variables as State

In PySpark, instead of using broadcast variables (which require serialization), you should:

  • Instantiate your analyzer and anonymizer inside the UDF function.

  • This ensures each worker node creates its own local instance when the UDF executes.

  • This pattern is commonly called "lazy initialization" and avoids broadcast altogether.

Most notably, do not instantiate your Presidio engines outside the UDF or attempt to broadcast them. Instead, initialize them within the UDF’s scope.


Example Rewrite for Databricks Shared Cluster

Here’s how you could change your code:

python
import pandas as pd from pyspark.sql.functions import col, pandas_udf from pyspark.sql.types import StringType anonymized_column = "note_text" # name of column to anonymize def anonymize_text(text: str) -> str: # Initialize inside the function so each worker gets their own instance from presidio_analyzer import AnalyzerEngine from presidio_anonymizer import AnonymizerEngine, OperatorConfig analyzer = AnalyzerEngine() anonymizer = AnonymizerEngine() analyzer_results = analyzer.analyze(text=text, language="en") anonymized_results = anonymizer.anonymize( text=text, analyzer_results=analyzer_results, operators={ "DEFAULT": OperatorConfig("replace", {"new_value": "<ANONYMIZED>"}) }, ) return anonymized_results.text def anonymize_series(s: pd.Series) -> pd.Series: return s.apply(anonymize_text) # define the function as pandas UDF anonymize = pandas_udf(anonymize_series, returnType=StringType()) # convert Pandas DataFrame to Spark DataFrame spark_report_df = spark.createDataFrame(report_df) # apply the udf anonymized_df = spark_report_df.withColumn( anonymized_column, anonymize(col(anonymized_column)) ) display(anonymized_df)

Key Points

  • The analyzer and anonymizer are not broadcasted or shared, but created fresh for each executor.​

  • This avoids serialization issues and gets around the Databricks shared mode restrictions.

  • The performance trade-off is that each executor must initialize its own copy, but it's necessary in this environment.


Additional Recommendations

  • If initialization time for the engines is significant and you have control over your cluster policies, you could investigate using singleton patterns within the workers. Otherwise, instantiation inside the UDF is safest for compatibility.​

  • Always refer to Presidio’s documentation for any specific serialization or concurrency guidance for its engines.​


Cluster KB Reference

Here’s the relevant advice from Databricks KB:

“Use a single-user cluster or pass a variable into a function as a state instead.” This means initializing the variable within the function or UDF, NOT as a broadcast variable.​


Summary Table

Approach Supported in Shared Mode Usage Notes
Broadcast Variables No sc.broadcast(obj) Fails with non-serializable objects
State Variable in Function/UDF Yes init inside function/UDF Best for Python objects
 
 

Switching to UDF-level initialization is the standard solution for Presidio usage in Databricks shared clusters. If you need explicitly efficient instantiation, consider reaching out via the Presidio GitHub discussions or support email for optimization tips.​