01-24-2024 02:18 PM
I need to run this kind of code:
from pyspark.sql import SparkSession
import pandas as pd
# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()
# Sample data
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3), ("David", 4), ("Eva", 5), ("Frank", 6)]
columns = ["Name", "Value"]
df = spark.createDataFrame(data, columns)
# Function to be applied on each partition
def decrypt_on_partition(iterator):
# Convert the iterator to a Pandas DataFrame
partition_df = pd.DataFrame(list(iterator), columns=columns)
# decryption client and cacher
KEY_ARN = _get_decrypt_key()
key_provider = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(key_ids=[KEY_ARN])
cache = aws_encryption_sdk.LocalCryptoMaterialsCache(LOCAL_CACHE)
caching_cmm = aws_encryption_sdk.CachingCryptoMaterialsManager(
master_key_provider=key_provider,
cache=cache,
max_age=MAX_AGE_IN_SECONDS,
max_messages_encrypted=MAX_ENTRY_MESSAGES,
)
client = aws_encryption_sdk.EncryptionSDKClient(
commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT
)
# Add an additional column to the Pandas DataFrame
partition_df = partition_df.assign(
bookingDict=partition_df["data"].apply(
lambda x: _decrypt_string(x, client, caching_cmm)
)
# Convert the Pandas DataFrame back to an iterator
for row in partition_df.itertuples(index=False):
yield tuple(row)
# Apply the function to each partition
result_rdd = df.rdd.mapPartitions(decrypt_on_partition)
# Collect the results back to the driver node (for demonstration purposes)
result_list = result_rdd.collect()
# Print the result
print(result_list)
# Stop the Spark session
spark.stop()
Why?
When running it in a Databricks notebook it works, however, locally I get this error:
[NOT_IMPLEMENTED] rdd() is not implemented.
I also tried to use the spark.DataFrame api without success, doing this,
df = df.foreachPartition(decrypt_on_partition)
Instead of,
result_rdd = df.rdd.mapPartitions(decrypt_on_partition)
result_list = result_rdd.collect()
Thanks a lot
01-29-2024 04:12 AM
I found a good solution that works both locally and in the cloud. Copy pasting the code in case it helps someone.
This is the higher level function in charge of partitioning the data and sending the data and the function fn to each node.
def decrypt_data(df: SparkDataFrame) -> SparkDataFrame:
"""Decrypts the data by partitioning and sending to each Spark node a pandas
DataFrame. Note that the GroupBy does nothing but it's necessary with the
current Spark DataFrame API.
Parameters
----------
df : SparkDataFrame
The initial Spark DataFrame
Returns
-------
SparkDataFrame
The decrypted Spark DataFrame
"""
# import here to avoid error in databricks - NameError: name 'spark' is not defined
from databricks.sdk.runtime import dbutils
KEY_ARN = dbutils.secrets.get(
scope=f"mlc_{Cts.ENV}", key=f"KEY_ARN_{Cts.ENV.upper()}"
)
fn = decrypt_on_partition(key_arn=KEY_ARN)
return df.groupBy().applyInPandas(fn, schema=get_schema())
See that decrypt_on_partition returns a function with they key embedded into it:
def decrypt_on_partition(key_arn: str) -> Callable[[pd.DataFrame], pd.DataFrame]:
"""This function is just a wrapper around the actual function used in each Spark
partition and it's used to embedd the ARN KEY into each Spark node in a clean way.
Parameters
----------
key_arn : str
The key
Returns
-------
Callable[[pd.DataFrame], pd.DataFrame]
The function that runs on each partition
"""
def _decrypt_on_partition(partition_df: pd.DataFrame) -> pd.DataFrame:
"""The decrypt function called on each DataFrame partition. Note that it's only
in this function that makes sense to instantiate the decryption client and
decryption cache, so that each Spark node has its own client and cache (it
can't be share at the cluster level).
Parameters
----------
partition_df : pd.DataFrame
The partitioned data as a pandas DataFrame
Returns
-------
pd.DataFrame
The resulting pandas DataFrame
"""
key_prov = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(key_ids=[key_arn])
cache = aws_encryption_sdk.LocalCryptoMaterialsCache(LOCAL_CACHE)
caching_cmm = aws_encryption_sdk.CachingCryptoMaterialsManager(
master_key_provider=key_prov,
cache=cache,
max_age=MAX_AGE_IN_SECONDS,
max_messages_encrypted=MAX_ENTRY_MESSAGES,
)
client = EncryptionSDKClient(
commitment_policy=aws_encryption_sdk.CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT
)
partition_df = (
partition_df.assign(
data=partition_df["data"].apply(
lambda x: _decrypt_string(x, client, caching_cmm)
)
)
)
return partition_df
return _decrypt_on_partition
01-24-2024 02:23 PM
I tried to edit the previous one but I can't. Just wanted to remove some lines of code that may be misleading. At the beginning this block should be out,
from pyspark.sql import SparkSession
import pandas as pd
# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()
# Sample data
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3), ("David", 4), ("Eva", 5), ("Frank", 6)]
columns = ["Name", "Value"]
df = spark.createDataFrame(data, columns)
And the last two lines are unnecessary,
# Print the result
print(result_list)
# Stop the Spark session
spark.stop()
Thanks again
01-24-2024 03:11 PM
Sorry, again updating about the problem. Using RDDs is among the listed limitations of Databricks Connect for Python: https://docs.databricks.com/en/dev-tools/databricks-connect/python/limitations.html
Do you know if there is any workaround? Or any way to achieve this?
Thanks
01-25-2024 02:53 AM
Hi @pablobd, In Shared access mode, Py4J security is enabled by default for security reasons, which restricts cer....
In your case, you’re trying to use the rdd() function on a DataFrame, which is not supported in some environments. Instead of using RDD-based operations, you might want to consider using DataFrame-based operations, which are generally more optimized and can take advantage of Spark’s built-in optimizations.
For the decryption part, PySpark provides functions like aes_encrypt and aes_decrypt for encryption .... However, if these functions do not meet your needs, you might need to define a custom function (UDF) for decryption. Please note that using Python UDFs might have performance implications compared to using Spark’s built-in functions.
I hope this helps! If you have more questions, feel free to ask. 😊
01-29-2024 04:12 AM
I found a good solution that works both locally and in the cloud. Copy pasting the code in case it helps someone.
This is the higher level function in charge of partitioning the data and sending the data and the function fn to each node.
def decrypt_data(df: SparkDataFrame) -> SparkDataFrame:
"""Decrypts the data by partitioning and sending to each Spark node a pandas
DataFrame. Note that the GroupBy does nothing but it's necessary with the
current Spark DataFrame API.
Parameters
----------
df : SparkDataFrame
The initial Spark DataFrame
Returns
-------
SparkDataFrame
The decrypted Spark DataFrame
"""
# import here to avoid error in databricks - NameError: name 'spark' is not defined
from databricks.sdk.runtime import dbutils
KEY_ARN = dbutils.secrets.get(
scope=f"mlc_{Cts.ENV}", key=f"KEY_ARN_{Cts.ENV.upper()}"
)
fn = decrypt_on_partition(key_arn=KEY_ARN)
return df.groupBy().applyInPandas(fn, schema=get_schema())
See that decrypt_on_partition returns a function with they key embedded into it:
def decrypt_on_partition(key_arn: str) -> Callable[[pd.DataFrame], pd.DataFrame]:
"""This function is just a wrapper around the actual function used in each Spark
partition and it's used to embedd the ARN KEY into each Spark node in a clean way.
Parameters
----------
key_arn : str
The key
Returns
-------
Callable[[pd.DataFrame], pd.DataFrame]
The function that runs on each partition
"""
def _decrypt_on_partition(partition_df: pd.DataFrame) -> pd.DataFrame:
"""The decrypt function called on each DataFrame partition. Note that it's only
in this function that makes sense to instantiate the decryption client and
decryption cache, so that each Spark node has its own client and cache (it
can't be share at the cluster level).
Parameters
----------
partition_df : pd.DataFrame
The partitioned data as a pandas DataFrame
Returns
-------
pd.DataFrame
The resulting pandas DataFrame
"""
key_prov = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(key_ids=[key_arn])
cache = aws_encryption_sdk.LocalCryptoMaterialsCache(LOCAL_CACHE)
caching_cmm = aws_encryption_sdk.CachingCryptoMaterialsManager(
master_key_provider=key_prov,
cache=cache,
max_age=MAX_AGE_IN_SECONDS,
max_messages_encrypted=MAX_ENTRY_MESSAGES,
)
client = EncryptionSDKClient(
commitment_policy=aws_encryption_sdk.CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT
)
partition_df = (
partition_df.assign(
data=partition_df["data"].apply(
lambda x: _decrypt_string(x, client, caching_cmm)
)
)
)
return partition_df
return _decrypt_on_partition
01-29-2024 05:11 AM
Thank you for sharing!
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group