cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Trying to use Broadcast to run Presidio distrubuted

kertsman_nm
New Contributor

Hello,

I am currently evaluating using Microsoft's Presidio de-identification libraries for my organization and would like to see if we can take advantage to Sparks broadcast capabilities, but I am getting an error message:

"[BROADCAST_VARIABLE_NOT_LOADED] Broadcast variable `2872` not loaded."

According the Databricks Cluster KB, using broadcast on a shared cluster is not possible (Cluster KB) and the solution is to "Use a single-user cluster or pass a variable into a function as a state instead". I am not able to do a single-user cluster and my organization does not currently allow that and I am frankly confused by "pass a variable into a function as a state instead" and how to do that in Databricks Spark.

If someone could provide some guidance on the State variable into a function that would be greatly appreciated.

Below is the code that I am trying to run that gives me the error.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import StringType

anonymized_column = "note_text" # name of column to anonymize
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()

broadcasted_analyzer = sc.broadcast(analyzer)
broadcasted_anonymizer = sc.broadcast(anonymizer)
# broadcast the engines to the cluster nodes
spark.conf.set("spark.databricks.broadcastTimeout", "600s")
# define a pandas UDF function and a series function over it.
def anonymize_text(text: str) -> str:
    analyzer = broadcasted_analyzer.value
    anonymizer = broadcasted_anonymizer.value
    analyzer_results = analyzer.analyze(text=text, language="en")
    anonymized_results = anonymizer.anonymize(
        text=text,
        analyzer_results=analyzer_results,
        operators={
            "DEFAULT": OperatorConfig("replace", {"new_value": "<ANONYMIZED>"})
        },
    )
    return anonymized_results.text

def anonymize_series(s: pd.Series) -> pd.Series:
    return s.apply(anonymize_text)

# define a the function as pandas UDF
anonymize = pandas_udf(anonymize_series, returnType=StringType())

# convert Pandas DataFrame to Spark DataFrame
spark_report_df = spark.createDataFrame(report_df)

# apply the udf
anonymized_df = spark_report_df.withColumn(
    anonymized_column, anonymize(col(anonymized_column))
)
display(anonymized_df)

 

1 REPLY 1

mark_ott
Databricks Employee
Databricks Employee

Youโ€™re encountering the [BROADCAST_VARIABLE_NOT_LOADED] error because Databricks in shared access mode cannot use broadcast variables with non-serializable Python objects (such as your Presidio engines) due to cluster architecture limitations. The cluster KB is correct: you must use a different approachโ€”namely, โ€œpass a variable into a function as state instead.โ€โ€‹

Solutions for Passing Variables as State

In PySpark, instead of using broadcast variables (which require serialization), you should:

  • Instantiate your analyzer and anonymizer inside the UDF function.

  • This ensures each worker node creates its own local instance when the UDF executes.

  • This pattern is commonly called "lazy initialization" and avoids broadcast altogether.

Most notably, do not instantiate your Presidio engines outside the UDF or attempt to broadcast them. Instead, initialize them within the UDFโ€™s scope.


Example Rewrite for Databricks Shared Cluster

Hereโ€™s how you could change your code:

python
import pandas as pd from pyspark.sql.functions import col, pandas_udf from pyspark.sql.types import StringType anonymized_column = "note_text" # name of column to anonymize def anonymize_text(text: str) -> str: # Initialize inside the function so each worker gets their own instance from presidio_analyzer import AnalyzerEngine from presidio_anonymizer import AnonymizerEngine, OperatorConfig analyzer = AnalyzerEngine() anonymizer = AnonymizerEngine() analyzer_results = analyzer.analyze(text=text, language="en") anonymized_results = anonymizer.anonymize( text=text, analyzer_results=analyzer_results, operators={ "DEFAULT": OperatorConfig("replace", {"new_value": "<ANONYMIZED>"}) }, ) return anonymized_results.text def anonymize_series(s: pd.Series) -> pd.Series: return s.apply(anonymize_text) # define the function as pandas UDF anonymize = pandas_udf(anonymize_series, returnType=StringType()) # convert Pandas DataFrame to Spark DataFrame spark_report_df = spark.createDataFrame(report_df) # apply the udf anonymized_df = spark_report_df.withColumn( anonymized_column, anonymize(col(anonymized_column)) ) display(anonymized_df)

Key Points

  • The analyzer and anonymizer are not broadcasted or shared, but created fresh for each executor.โ€‹

  • This avoids serialization issues and gets around the Databricks shared mode restrictions.

  • The performance trade-off is that each executor must initialize its own copy, but it's necessary in this environment.


Additional Recommendations

  • If initialization time for the engines is significant and you have control over your cluster policies, you could investigate using singleton patterns within the workers. Otherwise, instantiation inside the UDF is safest for compatibility.โ€‹

  • Always refer to Presidioโ€™s documentation for any specific serialization or concurrency guidance for its engines.โ€‹


Cluster KB Reference

Hereโ€™s the relevant advice from Databricks KB:

โ€œUse a single-user cluster or pass a variable into a function as a state instead.โ€ This means initializing the variable within the function or UDF, NOT as a broadcast variable.โ€‹


Summary Table

Approach Supported in Shared Mode Usage Notes
Broadcast Variables โŒ No sc.broadcast(obj) Fails with non-serializable objects
State Variable in Function/UDF โœ… Yes init inside function/UDF Best for Python objects
 
 

Switching to UDF-level initialization is the standard solution for Presidio usage in Databricks shared clusters. If you need explicitly efficient instantiation, consider reaching out via the Presidio GitHub discussions or support email for optimization tips.โ€‹

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now