cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Trying to use Broadcast to run Presidio distrubuted

kertsman_nm
New Contributor

Hello,

I am currently evaluating using Microsoft's Presidio de-identification libraries for my organization and would like to see if we can take advantage to Sparks broadcast capabilities, but I am getting an error message:

"[BROADCAST_VARIABLE_NOT_LOADED] Broadcast variable `2872` not loaded."

According the Databricks Cluster KB, using broadcast on a shared cluster is not possible (Cluster KB) and the solution is to "Use a single-user cluster or pass a variable into a function as a state instead". I am not able to do a single-user cluster and my organization does not currently allow that and I am frankly confused by "pass a variable into a function as a state instead" and how to do that in Databricks Spark.

If someone could provide some guidance on the State variable into a function that would be greatly appreciated.

Below is the code that I am trying to run that gives me the error.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import StringType

anonymized_column = "note_text" # name of column to anonymize
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()

broadcasted_analyzer = sc.broadcast(analyzer)
broadcasted_anonymizer = sc.broadcast(anonymizer)
# broadcast the engines to the cluster nodes
spark.conf.set("spark.databricks.broadcastTimeout", "600s")
# define a pandas UDF function and a series function over it.
def anonymize_text(text: str) -> str:
    analyzer = broadcasted_analyzer.value
    anonymizer = broadcasted_anonymizer.value
    analyzer_results = analyzer.analyze(text=text, language="en")
    anonymized_results = anonymizer.anonymize(
        text=text,
        analyzer_results=analyzer_results,
        operators={
            "DEFAULT": OperatorConfig("replace", {"new_value": "<ANONYMIZED>"})
        },
    )
    return anonymized_results.text

def anonymize_series(s: pd.Series) -> pd.Series:
    return s.apply(anonymize_text)

# define a the function as pandas UDF
anonymize = pandas_udf(anonymize_series, returnType=StringType())

# convert Pandas DataFrame to Spark DataFrame
spark_report_df = spark.createDataFrame(report_df)

# apply the udf
anonymized_df = spark_report_df.withColumn(
    anonymized_column, anonymize(col(anonymized_column))
)
display(anonymized_df)

 

0 REPLIES 0

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now