โ08-22-2023 05:41 AM - edited โ08-24-2023 01:16 AM
Hello
I suspect that this canยดt be done out of the box and want to know a way of doing this. I am trying to do it without success. So far I have tried this:
Based on this link, I have created a Class and an object (companion and not, both ways) for ciphering text in every worker of the cluster. It has some third party libraries implied so thats why I cant use serialization. And for initializating its values, I take some of them from an Azure Keyvault (this seems to work on driver but not in executors).
When I execute the cells declaring the class and the object it runs ok, this I think is because its the driver here, and it initializes ok its values from the KeyVault. But when using it for cipher a table I get an exception.
In my code I have a Spark UDF that makes use of that class for ciphering, like this:
val encrypt_long = udf[Long, Long](plain_long => {
val plain_texted = plain_long.toString
val plain_bytes = AESFPE.instance.numericAlphabetMapper.convertToIndexes(plain_texted.toCharArray)
val cipher_chars = AESFPE.instance.numericAlphabetMapper.convertToChars(AESFPE.instance.encrypt(AESFPE.instance.cipher, AESFPE.instance.sKey, AESFPE.instance.tweak, AESFPE.instance.numericRadix, plain_bytes))
new String(cipher_chars).toLong
}
And calling this UDF I get this exception:
Job aborted due to stage failure: [FAILED_EXECUTE_UDF] Failed to execute user defined function ($read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$9794/149626391: (bigint) => bigint).
Caused by: [FAILED_EXECUTE_UDF] Failed to execute user defined function ($read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$9794/149626391: (bigint) => bigint).
Caused by: NullPointerException:
With this cause for the exception:
Caused by: java.lang.NullPointerException
at $line1b09d7456fac420b8b404a7387bae2f8123.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$AESFPE.sKey$lzycompute(command-4335102701464202:16)
at $line1b09d7456fac420b8b404a7387bae2f8123.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$AESFPE.sKey(command-4335102701464202:16)
at $line1b09d7456fac420b8b404a7387bae2f8127.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$encrypt_long$1(command-4335102701464206:22)
at scala.runtime.java8.JFunction1$mcJJ$sp.apply(JFunction1$mcJJ$sp.java:23)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:761)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:464)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$3(FileFormatWriter.scala:316)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:174)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:142)
at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:125)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:142)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:97)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:904)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1713)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:907)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:761)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
There is a need here to have some of the same value configs in every worker for the cipher class, so they can encrypt and decrypt with the same configuration.
As another way arround, but not being a valid solution, I have made it work hardcoding these configuration values in the cipher class. But this is not valid because its not safe to have that kind of configuration in code, it must be in a place like a KeyVault. Using the KeyVault and serializing the cipher class is not an option because it has third-party code and throws not serializable exception.
โ08-31-2023 01:25 AM
I found a workaround for the problem to be able to use the secrets from the KeyVault in all the execturos. I only tested so far this in the notebooks, I want to try later in a JAR job.
First here is a link to the official documentation that highlights some limitations of dbutils API, the very one that Databricks recommends to read secrets.
Now the workaround:
Not being able to use dbutils from execturos, these are the steps I followed to bring the values of a KeyVault secret to an object initialization in every executor:
Reference a secret in an evnironmental variable, like this.
Prior to any other cell execution, set values in the same variable names from the driver, but in the executorยดs environments, like this:
spark.conf.set("spark.executorEnv.anon_pwd", sys.env("anon_pwd")) spark.conf.set("spark.executorEnv.anon_salt", sys.env("anon_salt")) spark.conf.set("spark.executorEnv.anon_tweak", sys.env("anon_tweak"))
Use the UDF function that makes use of the singleton object in every executor.
โ08-31-2023 01:25 AM
I found a workaround for the problem to be able to use the secrets from the KeyVault in all the execturos. I only tested so far this in the notebooks, I want to try later in a JAR job.
First here is a link to the official documentation that highlights some limitations of dbutils API, the very one that Databricks recommends to read secrets.
Now the workaround:
Not being able to use dbutils from execturos, these are the steps I followed to bring the values of a KeyVault secret to an object initialization in every executor:
Reference a secret in an evnironmental variable, like this.
Prior to any other cell execution, set values in the same variable names from the driver, but in the executorยดs environments, like this:
spark.conf.set("spark.executorEnv.anon_pwd", sys.env("anon_pwd")) spark.conf.set("spark.executorEnv.anon_salt", sys.env("anon_salt")) spark.conf.set("spark.executorEnv.anon_tweak", sys.env("anon_tweak"))
Use the UDF function that makes use of the singleton object in every executor.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group