cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
cancel
Showing results for 
Search instead for 
Did you mean: 

Error when accessing rdd of DataFrame

pablobd
Contributor

I need to run this kind of code:

 

from pyspark.sql import SparkSession
import pandas as pd

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Sample data
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3), ("David", 4), ("Eva", 5), ("Frank", 6)]
columns = ["Name", "Value"]
df = spark.createDataFrame(data, columns)

# Function to be applied on each partition
def decrypt_on_partition(iterator):
    # Convert the iterator to a Pandas DataFrame
    partition_df = pd.DataFrame(list(iterator), columns=columns)

    # decryption client and cacher
    KEY_ARN = _get_decrypt_key()
    key_provider = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(key_ids=[KEY_ARN])
    cache = aws_encryption_sdk.LocalCryptoMaterialsCache(LOCAL_CACHE)
    caching_cmm = aws_encryption_sdk.CachingCryptoMaterialsManager(
        master_key_provider=key_provider,
        cache=cache,
        max_age=MAX_AGE_IN_SECONDS,
        max_messages_encrypted=MAX_ENTRY_MESSAGES,
    )
    client = aws_encryption_sdk.EncryptionSDKClient(
        commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT
    )
    
    # Add an additional column to the Pandas DataFrame
    partition_df = partition_df.assign(
        bookingDict=partition_df["data"].apply(
            lambda x: _decrypt_string(x, client, caching_cmm)
        )
    
    # Convert the Pandas DataFrame back to an iterator
    for row in partition_df.itertuples(index=False):
        yield tuple(row)
        
# Apply the function to each partition
result_rdd = df.rdd.mapPartitions(decrypt_on_partition)

# Collect the results back to the driver node (for demonstration purposes)
result_list = result_rdd.collect()

# Print the result
print(result_list)

# Stop the Spark session
spark.stop()

 

Why?

Because with this code I send the caching service and encryption client to each node and run it on each partition of the DataFrame. Also, the decryption service can only be accessed from a Spark cluster with the  appropriate AWS IAM role associated. And the cacher used by the decryption service (to speed up decryption and save on costs) can't be run in Spark (that's why I convert the stream of data to a pandas.DataFrame).
 
Benefits:
  1. It runs 4 times faster (4 nodes in this cluster). All integ data decrypted in 30 seconds.
  2. I should be able to decrypt the data from my local computer if connected to the appropriate cluster and develop the ML Model locally.

When running it in a Databricks notebook it works, however, locally I get this error:

[NOT_IMPLEMENTED] rdd() is not implemented.

I also tried to use the spark.DataFrame api without success, doing this,

 

df = df.foreachPartition(decrypt_on_partition)

 

Instead of,

 

result_rdd = df.rdd.mapPartitions(decrypt_on_partition)
result_list = result_rdd.collect()

 

Thanks a lot

1 ACCEPTED SOLUTION

Accepted Solutions

pablobd
Contributor

I found a good solution that works both locally and in the cloud. Copy pasting the code in case it helps someone.

This is the higher level function in charge of partitioning the data and sending the data and the function fn to each node. 

def decrypt_data(df: SparkDataFrame) -> SparkDataFrame:
    """Decrypts the data by partitioning and sending to each Spark node a pandas
    DataFrame. Note that the GroupBy does nothing but it's necessary with the
    current Spark DataFrame API.

    Parameters
    ----------
    df : SparkDataFrame
        The initial Spark DataFrame

    Returns
    -------
    SparkDataFrame
        The decrypted Spark DataFrame
    """
    # import here to avoid error in databricks - NameError: name 'spark' is not defined
    from databricks.sdk.runtime import dbutils

    KEY_ARN = dbutils.secrets.get(
        scope=f"mlc_{Cts.ENV}", key=f"KEY_ARN_{Cts.ENV.upper()}"
    )
    fn = decrypt_on_partition(key_arn=KEY_ARN)
    return df.groupBy().applyInPandas(fn, schema=get_schema())

 See that decrypt_on_partition returns a function with they key embedded into it:

def decrypt_on_partition(key_arn: str) -> Callable[[pd.DataFrame], pd.DataFrame]:
    """This function is just a wrapper around the actual function used in each Spark
    partition and it's used to embedd the ARN KEY into each Spark node in a clean way.

    Parameters
    ----------
    key_arn : str
        The key

    Returns
    -------
    Callable[[pd.DataFrame], pd.DataFrame]
        The function that runs on each partition
    """

    def _decrypt_on_partition(partition_df: pd.DataFrame) -> pd.DataFrame:
        """The decrypt function called on each DataFrame partition. Note that it's only
        in this function that makes sense to instantiate the decryption client and
        decryption cache, so that each Spark node has its own client and cache (it
        can't be share at the cluster level).

        Parameters
        ----------
        partition_df : pd.DataFrame
            The partitioned data as a pandas DataFrame

        Returns
        -------
        pd.DataFrame
            The resulting pandas DataFrame
        """
        key_prov = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(key_ids=[key_arn])
        cache = aws_encryption_sdk.LocalCryptoMaterialsCache(LOCAL_CACHE)
        caching_cmm = aws_encryption_sdk.CachingCryptoMaterialsManager(
            master_key_provider=key_prov,
            cache=cache,
            max_age=MAX_AGE_IN_SECONDS,
            max_messages_encrypted=MAX_ENTRY_MESSAGES,
        )
        client = EncryptionSDKClient(
            commitment_policy=aws_encryption_sdk.CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT
        )

        partition_df = (
            partition_df.assign(
                data=partition_df["data"].apply(
                    lambda x: _decrypt_string(x, client, caching_cmm)
                )
            )
        )

        return partition_df

    return _decrypt_on_partition




View solution in original post

5 REPLIES 5

pablobd
Contributor

I tried to edit the previous one but I can't. Just wanted to remove some lines of code that may be misleading. At the beginning this block should be out,

from pyspark.sql import SparkSession
import pandas as pd

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Sample data
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3), ("David", 4), ("Eva", 5), ("Frank", 6)]
columns = ["Name", "Value"]
df = spark.createDataFrame(data, columns)

And the last two lines are unnecessary,

# Print the result
print(result_list)

# Stop the Spark session
spark.stop()

Thanks again

pablobd
Contributor

Sorry, again updating about the problem. Using RDDs is among the listed limitations of Databricks Connect for Python: https://docs.databricks.com/en/dev-tools/databricks-connect/python/limitations.html

Do you know if there is any workaround? Or any way to achieve this?
Thanks

Kaniz
Community Manager
Community Manager

Hi @pablobd, In Shared access mode, Py4J security is enabled by default for security reasons, which restricts cer....

 

In your case, you’re trying to use the rdd() function on a DataFrame, which is not supported in some environments. Instead of using RDD-based operations, you might want to consider using DataFrame-based operations, which are generally more optimized and can take advantage of Spark’s built-in optimizations.

 

For the decryption part, PySpark provides functions like aes_encrypt and aes_decrypt for encryption .... However, if these functions do not meet your needs, you might need to define a custom function (UDF) for decryption. Please note that using Python UDFs might have performance implications compared to using Spark’s built-in functions.

 

I hope this helps! If you have more questions, feel free to ask. 😊

pablobd
Contributor

I found a good solution that works both locally and in the cloud. Copy pasting the code in case it helps someone.

This is the higher level function in charge of partitioning the data and sending the data and the function fn to each node. 

def decrypt_data(df: SparkDataFrame) -> SparkDataFrame:
    """Decrypts the data by partitioning and sending to each Spark node a pandas
    DataFrame. Note that the GroupBy does nothing but it's necessary with the
    current Spark DataFrame API.

    Parameters
    ----------
    df : SparkDataFrame
        The initial Spark DataFrame

    Returns
    -------
    SparkDataFrame
        The decrypted Spark DataFrame
    """
    # import here to avoid error in databricks - NameError: name 'spark' is not defined
    from databricks.sdk.runtime import dbutils

    KEY_ARN = dbutils.secrets.get(
        scope=f"mlc_{Cts.ENV}", key=f"KEY_ARN_{Cts.ENV.upper()}"
    )
    fn = decrypt_on_partition(key_arn=KEY_ARN)
    return df.groupBy().applyInPandas(fn, schema=get_schema())

 See that decrypt_on_partition returns a function with they key embedded into it:

def decrypt_on_partition(key_arn: str) -> Callable[[pd.DataFrame], pd.DataFrame]:
    """This function is just a wrapper around the actual function used in each Spark
    partition and it's used to embedd the ARN KEY into each Spark node in a clean way.

    Parameters
    ----------
    key_arn : str
        The key

    Returns
    -------
    Callable[[pd.DataFrame], pd.DataFrame]
        The function that runs on each partition
    """

    def _decrypt_on_partition(partition_df: pd.DataFrame) -> pd.DataFrame:
        """The decrypt function called on each DataFrame partition. Note that it's only
        in this function that makes sense to instantiate the decryption client and
        decryption cache, so that each Spark node has its own client and cache (it
        can't be share at the cluster level).

        Parameters
        ----------
        partition_df : pd.DataFrame
            The partitioned data as a pandas DataFrame

        Returns
        -------
        pd.DataFrame
            The resulting pandas DataFrame
        """
        key_prov = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(key_ids=[key_arn])
        cache = aws_encryption_sdk.LocalCryptoMaterialsCache(LOCAL_CACHE)
        caching_cmm = aws_encryption_sdk.CachingCryptoMaterialsManager(
            master_key_provider=key_prov,
            cache=cache,
            max_age=MAX_AGE_IN_SECONDS,
            max_messages_encrypted=MAX_ENTRY_MESSAGES,
        )
        client = EncryptionSDKClient(
            commitment_policy=aws_encryption_sdk.CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT
        )

        partition_df = (
            partition_df.assign(
                data=partition_df["data"].apply(
                    lambda x: _decrypt_string(x, client, caching_cmm)
                )
            )
        )

        return partition_df

    return _decrypt_on_partition




Lakshay
Esteemed Contributor
Esteemed Contributor

Thank you for sharing!

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.