cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results for 
Search instead for 
Did you mean: 

Error when accessing rdd of DataFrame

pablobd
Contributor II

I need to run this kind of code:

 

from pyspark.sql import SparkSession
import pandas as pd

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Sample data
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3), ("David", 4), ("Eva", 5), ("Frank", 6)]
columns = ["Name", "Value"]
df = spark.createDataFrame(data, columns)

# Function to be applied on each partition
def decrypt_on_partition(iterator):
    # Convert the iterator to a Pandas DataFrame
    partition_df = pd.DataFrame(list(iterator), columns=columns)

    # decryption client and cacher
    KEY_ARN = _get_decrypt_key()
    key_provider = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(key_ids=[KEY_ARN])
    cache = aws_encryption_sdk.LocalCryptoMaterialsCache(LOCAL_CACHE)
    caching_cmm = aws_encryption_sdk.CachingCryptoMaterialsManager(
        master_key_provider=key_provider,
        cache=cache,
        max_age=MAX_AGE_IN_SECONDS,
        max_messages_encrypted=MAX_ENTRY_MESSAGES,
    )
    client = aws_encryption_sdk.EncryptionSDKClient(
        commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT
    )
    
    # Add an additional column to the Pandas DataFrame
    partition_df = partition_df.assign(
        bookingDict=partition_df["data"].apply(
            lambda x: _decrypt_string(x, client, caching_cmm)
        )
    
    # Convert the Pandas DataFrame back to an iterator
    for row in partition_df.itertuples(index=False):
        yield tuple(row)
        
# Apply the function to each partition
result_rdd = df.rdd.mapPartitions(decrypt_on_partition)

# Collect the results back to the driver node (for demonstration purposes)
result_list = result_rdd.collect()

# Print the result
print(result_list)

# Stop the Spark session
spark.stop()

 

Why?

Because with this code I send the caching service and encryption client to each node and run it on each partition of the DataFrame. Also, the decryption service can only be accessed from a Spark cluster with the  appropriate AWS IAM role associated. And the cacher used by the decryption service (to speed up decryption and save on costs) can't be run in Spark (that's why I convert the stream of data to a pandas.DataFrame).
 
Benefits:
  1. It runs 4 times faster (4 nodes in this cluster). All integ data decrypted in 30 seconds.
  2. I should be able to decrypt the data from my local computer if connected to the appropriate cluster and develop the ML Model locally.

When running it in a Databricks notebook it works, however, locally I get this error:

[NOT_IMPLEMENTED] rdd() is not implemented.

I also tried to use the spark.DataFrame api without success, doing this,

 

df = df.foreachPartition(decrypt_on_partition)

 

Instead of,

 

result_rdd = df.rdd.mapPartitions(decrypt_on_partition)
result_list = result_rdd.collect()

 

Thanks a lot

1 ACCEPTED SOLUTION

Accepted Solutions

pablobd
Contributor II

I found a good solution that works both locally and in the cloud. Copy pasting the code in case it helps someone.

This is the higher level function in charge of partitioning the data and sending the data and the function fn to each node. 

def decrypt_data(df: SparkDataFrame) -> SparkDataFrame:
    """Decrypts the data by partitioning and sending to each Spark node a pandas
    DataFrame. Note that the GroupBy does nothing but it's necessary with the
    current Spark DataFrame API.

    Parameters
    ----------
    df : SparkDataFrame
        The initial Spark DataFrame

    Returns
    -------
    SparkDataFrame
        The decrypted Spark DataFrame
    """
    # import here to avoid error in databricks - NameError: name 'spark' is not defined
    from databricks.sdk.runtime import dbutils

    KEY_ARN = dbutils.secrets.get(
        scope=f"mlc_{Cts.ENV}", key=f"KEY_ARN_{Cts.ENV.upper()}"
    )
    fn = decrypt_on_partition(key_arn=KEY_ARN)
    return df.groupBy().applyInPandas(fn, schema=get_schema())

 See that decrypt_on_partition returns a function with they key embedded into it:

def decrypt_on_partition(key_arn: str) -> Callable[[pd.DataFrame], pd.DataFrame]:
    """This function is just a wrapper around the actual function used in each Spark
    partition and it's used to embedd the ARN KEY into each Spark node in a clean way.

    Parameters
    ----------
    key_arn : str
        The key

    Returns
    -------
    Callable[[pd.DataFrame], pd.DataFrame]
        The function that runs on each partition
    """

    def _decrypt_on_partition(partition_df: pd.DataFrame) -> pd.DataFrame:
        """The decrypt function called on each DataFrame partition. Note that it's only
        in this function that makes sense to instantiate the decryption client and
        decryption cache, so that each Spark node has its own client and cache (it
        can't be share at the cluster level).

        Parameters
        ----------
        partition_df : pd.DataFrame
            The partitioned data as a pandas DataFrame

        Returns
        -------
        pd.DataFrame
            The resulting pandas DataFrame
        """
        key_prov = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(key_ids=[key_arn])
        cache = aws_encryption_sdk.LocalCryptoMaterialsCache(LOCAL_CACHE)
        caching_cmm = aws_encryption_sdk.CachingCryptoMaterialsManager(
            master_key_provider=key_prov,
            cache=cache,
            max_age=MAX_AGE_IN_SECONDS,
            max_messages_encrypted=MAX_ENTRY_MESSAGES,
        )
        client = EncryptionSDKClient(
            commitment_policy=aws_encryption_sdk.CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT
        )

        partition_df = (
            partition_df.assign(
                data=partition_df["data"].apply(
                    lambda x: _decrypt_string(x, client, caching_cmm)
                )
            )
        )

        return partition_df

    return _decrypt_on_partition




View solution in original post

4 REPLIES 4

pablobd
Contributor II

I tried to edit the previous one but I can't. Just wanted to remove some lines of code that may be misleading. At the beginning this block should be out,

from pyspark.sql import SparkSession
import pandas as pd

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Sample data
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3), ("David", 4), ("Eva", 5), ("Frank", 6)]
columns = ["Name", "Value"]
df = spark.createDataFrame(data, columns)

And the last two lines are unnecessary,

# Print the result
print(result_list)

# Stop the Spark session
spark.stop()

Thanks again

pablobd
Contributor II

Sorry, again updating about the problem. Using RDDs is among the listed limitations of Databricks Connect for Python: https://docs.databricks.com/en/dev-tools/databricks-connect/python/limitations.html

Do you know if there is any workaround? Or any way to achieve this?
Thanks

pablobd
Contributor II

I found a good solution that works both locally and in the cloud. Copy pasting the code in case it helps someone.

This is the higher level function in charge of partitioning the data and sending the data and the function fn to each node. 

def decrypt_data(df: SparkDataFrame) -> SparkDataFrame:
    """Decrypts the data by partitioning and sending to each Spark node a pandas
    DataFrame. Note that the GroupBy does nothing but it's necessary with the
    current Spark DataFrame API.

    Parameters
    ----------
    df : SparkDataFrame
        The initial Spark DataFrame

    Returns
    -------
    SparkDataFrame
        The decrypted Spark DataFrame
    """
    # import here to avoid error in databricks - NameError: name 'spark' is not defined
    from databricks.sdk.runtime import dbutils

    KEY_ARN = dbutils.secrets.get(
        scope=f"mlc_{Cts.ENV}", key=f"KEY_ARN_{Cts.ENV.upper()}"
    )
    fn = decrypt_on_partition(key_arn=KEY_ARN)
    return df.groupBy().applyInPandas(fn, schema=get_schema())

 See that decrypt_on_partition returns a function with they key embedded into it:

def decrypt_on_partition(key_arn: str) -> Callable[[pd.DataFrame], pd.DataFrame]:
    """This function is just a wrapper around the actual function used in each Spark
    partition and it's used to embedd the ARN KEY into each Spark node in a clean way.

    Parameters
    ----------
    key_arn : str
        The key

    Returns
    -------
    Callable[[pd.DataFrame], pd.DataFrame]
        The function that runs on each partition
    """

    def _decrypt_on_partition(partition_df: pd.DataFrame) -> pd.DataFrame:
        """The decrypt function called on each DataFrame partition. Note that it's only
        in this function that makes sense to instantiate the decryption client and
        decryption cache, so that each Spark node has its own client and cache (it
        can't be share at the cluster level).

        Parameters
        ----------
        partition_df : pd.DataFrame
            The partitioned data as a pandas DataFrame

        Returns
        -------
        pd.DataFrame
            The resulting pandas DataFrame
        """
        key_prov = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(key_ids=[key_arn])
        cache = aws_encryption_sdk.LocalCryptoMaterialsCache(LOCAL_CACHE)
        caching_cmm = aws_encryption_sdk.CachingCryptoMaterialsManager(
            master_key_provider=key_prov,
            cache=cache,
            max_age=MAX_AGE_IN_SECONDS,
            max_messages_encrypted=MAX_ENTRY_MESSAGES,
        )
        client = EncryptionSDKClient(
            commitment_policy=aws_encryption_sdk.CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT
        )

        partition_df = (
            partition_df.assign(
                data=partition_df["data"].apply(
                    lambda x: _decrypt_string(x, client, caching_cmm)
                )
            )
        )

        return partition_df

    return _decrypt_on_partition




Lakshay
Databricks Employee
Databricks Employee

Thank you for sharing!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group