Trying to use Broadcast to run Presidio distrubuted
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-05-2025 03:53 PM
Hello,
I am currently evaluating using Microsoft's Presidio de-identification libraries for my organization and would like to see if we can take advantage to Sparks broadcast capabilities, but I am getting an error message:
"[BROADCAST_VARIABLE_NOT_LOADED] Broadcast variable `2872` not loaded."
According the Databricks Cluster KB, using broadcast on a shared cluster is not possible (Cluster KB) and the solution is to "Use a single-user cluster or pass a variable into a function as a state instead". I am not able to do a single-user cluster and my organization does not currently allow that and I am frankly confused by "pass a variable into a function as a state instead" and how to do that in Databricks Spark.
If someone could provide some guidance on the State variable into a function that would be greatly appreciated.
Below is the code that I am trying to run that gives me the error.
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import StringType
anonymized_column = "note_text" # name of column to anonymize
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()
broadcasted_analyzer = sc.broadcast(analyzer)
broadcasted_anonymizer = sc.broadcast(anonymizer)
# broadcast the engines to the cluster nodes
spark.conf.set("spark.databricks.broadcastTimeout", "600s")
# define a pandas UDF function and a series function over it.
def anonymize_text(text: str) -> str:
analyzer = broadcasted_analyzer.value
anonymizer = broadcasted_anonymizer.value
analyzer_results = analyzer.analyze(text=text, language="en")
anonymized_results = anonymizer.anonymize(
text=text,
analyzer_results=analyzer_results,
operators={
"DEFAULT": OperatorConfig("replace", {"new_value": "<ANONYMIZED>"})
},
)
return anonymized_results.text
def anonymize_series(s: pd.Series) -> pd.Series:
return s.apply(anonymize_text)
# define a the function as pandas UDF
anonymize = pandas_udf(anonymize_series, returnType=StringType())
# convert Pandas DataFrame to Spark DataFrame
spark_report_df = spark.createDataFrame(report_df)
# apply the udf
anonymized_df = spark_report_df.withColumn(
anonymized_column, anonymize(col(anonymized_column))
)
display(anonymized_df)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-09-2025 06:31 AM
You’re encountering the [BROADCAST_VARIABLE_NOT_LOADED] error because Databricks in shared access mode cannot use broadcast variables with non-serializable Python objects (such as your Presidio engines) due to cluster architecture limitations. The cluster KB is correct: you must use a different approach—namely, “pass a variable into a function as state instead.”
Solutions for Passing Variables as State
In PySpark, instead of using broadcast variables (which require serialization), you should:
-
Instantiate your analyzer and anonymizer inside the UDF function.
-
This ensures each worker node creates its own local instance when the UDF executes.
-
This pattern is commonly called "lazy initialization" and avoids broadcast altogether.
Most notably, do not instantiate your Presidio engines outside the UDF or attempt to broadcast them. Instead, initialize them within the UDF’s scope.
Example Rewrite for Databricks Shared Cluster
Here’s how you could change your code:
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import StringType
anonymized_column = "note_text" # name of column to anonymize
def anonymize_text(text: str) -> str:
# Initialize inside the function so each worker gets their own instance
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine, OperatorConfig
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()
analyzer_results = analyzer.analyze(text=text, language="en")
anonymized_results = anonymizer.anonymize(
text=text,
analyzer_results=analyzer_results,
operators={
"DEFAULT": OperatorConfig("replace", {"new_value": "<ANONYMIZED>"})
},
)
return anonymized_results.text
def anonymize_series(s: pd.Series) -> pd.Series:
return s.apply(anonymize_text)
# define the function as pandas UDF
anonymize = pandas_udf(anonymize_series, returnType=StringType())
# convert Pandas DataFrame to Spark DataFrame
spark_report_df = spark.createDataFrame(report_df)
# apply the udf
anonymized_df = spark_report_df.withColumn(
anonymized_column, anonymize(col(anonymized_column))
)
display(anonymized_df)
Key Points
-
The analyzer and anonymizer are not broadcasted or shared, but created fresh for each executor.
-
This avoids serialization issues and gets around the Databricks shared mode restrictions.
-
The performance trade-off is that each executor must initialize its own copy, but it's necessary in this environment.
Additional Recommendations
-
If initialization time for the engines is significant and you have control over your cluster policies, you could investigate using singleton patterns within the workers. Otherwise, instantiation inside the UDF is safest for compatibility.
-
Always refer to Presidio’s documentation for any specific serialization or concurrency guidance for its engines.
Cluster KB Reference
Here’s the relevant advice from Databricks KB:
“Use a single-user cluster or pass a variable into a function as a state instead.” This means initializing the variable within the function or UDF, NOT as a broadcast variable.
Summary Table
| Approach | Supported in Shared Mode | Usage | Notes |
|---|---|---|---|
| Broadcast Variables | ❌ No | sc.broadcast(obj) | Fails with non-serializable objects |
| State Variable in Function/UDF | ✅ Yes | init inside function/UDF | Best for Python objects |
Switching to UDF-level initialization is the standard solution for Presidio usage in Databricks shared clusters. If you need explicitly efficient instantiation, consider reaching out via the Presidio GitHub discussions or support email for optimization tips.