<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Error when accessing rdd of DataFrame in Machine Learning</title>
    <link>https://community.databricks.com/t5/machine-learning/error-when-accessing-rdd-of-dataframe/m-p/58367#M2905</link>
    <description>&lt;P&gt;I need to run this kind of code:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql import SparkSession
import pandas as pd

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Sample data
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3), ("David", 4), ("Eva", 5), ("Frank", 6)]
columns = ["Name", "Value"]
df = spark.createDataFrame(data, columns)

# Function to be applied on each partition
def decrypt_on_partition(iterator):
    # Convert the iterator to a Pandas DataFrame
    partition_df = pd.DataFrame(list(iterator), columns=columns)

    # decryption client and cacher
    KEY_ARN = _get_decrypt_key()
    key_provider = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(key_ids=[KEY_ARN])
    cache = aws_encryption_sdk.LocalCryptoMaterialsCache(LOCAL_CACHE)
    caching_cmm = aws_encryption_sdk.CachingCryptoMaterialsManager(
        master_key_provider=key_provider,
        cache=cache,
        max_age=MAX_AGE_IN_SECONDS,
        max_messages_encrypted=MAX_ENTRY_MESSAGES,
    )
    client = aws_encryption_sdk.EncryptionSDKClient(
        commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT
    )
    
    # Add an additional column to the Pandas DataFrame
    partition_df = partition_df.assign(
        bookingDict=partition_df["data"].apply(
            lambda x: _decrypt_string(x, client, caching_cmm)
        )
    
    # Convert the Pandas DataFrame back to an iterator
    for row in partition_df.itertuples(index=False):
        yield tuple(row)
        
# Apply the function to each partition
result_rdd = df.rdd.mapPartitions(decrypt_on_partition)

# Collect the results back to the driver node (for demonstration purposes)
result_list = result_rdd.collect()

# Print the result
print(result_list)

# Stop the Spark session
spark.stop()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Why?&lt;/P&gt;&lt;DIV class=""&gt;Because with this code I send the caching service and encryption client to each node and run it on each partition of the DataFrame. Also, t&lt;SPAN&gt;he decryption service can only be accessed from a Spark cluster with the&amp;nbsp; appropriate AWS IAM role associated. And the cacher&amp;nbsp;&lt;/SPAN&gt;used by the decryption service (to speed up decryption and save on costs) can't be run in Spark (that's why I convert the stream of data to a pandas.DataFrame).&lt;/DIV&gt;&lt;DIV class=""&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV class=""&gt;Benefits:&lt;/DIV&gt;&lt;OL class=""&gt;&lt;LI&gt;It runs 4 times faster (4 nodes in this cluster). All integ data decrypted in 30 seconds.&lt;/LI&gt;&lt;LI&gt;I should be able to decrypt the data from my local computer if connected to the appropriate cluster and develop the ML Model locally.&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;When running it in a Databricks notebook it works, however, locally I get this error:&lt;BR /&gt;&lt;BR /&gt;&lt;STRONG&gt;[NOT_IMPLEMENTED] rdd() is not implemented.&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;I also tried to use the spark.DataFrame api without success, doing this,&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;df = df.foreachPartition(decrypt_on_partition)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Instead of,&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;result_rdd = df.rdd.mapPartitions(decrypt_on_partition)
result_list = result_rdd.collect()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Thanks a lot&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;</description>
    <pubDate>Wed, 24 Jan 2024 22:18:31 GMT</pubDate>
    <dc:creator>pablobd</dc:creator>
    <dc:date>2024-01-24T22:18:31Z</dc:date>
    <item>
      <title>Error when accessing rdd of DataFrame</title>
      <link>https://community.databricks.com/t5/machine-learning/error-when-accessing-rdd-of-dataframe/m-p/58367#M2905</link>
      <description>&lt;P&gt;I need to run this kind of code:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql import SparkSession
import pandas as pd

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Sample data
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3), ("David", 4), ("Eva", 5), ("Frank", 6)]
columns = ["Name", "Value"]
df = spark.createDataFrame(data, columns)

# Function to be applied on each partition
def decrypt_on_partition(iterator):
    # Convert the iterator to a Pandas DataFrame
    partition_df = pd.DataFrame(list(iterator), columns=columns)

    # decryption client and cacher
    KEY_ARN = _get_decrypt_key()
    key_provider = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(key_ids=[KEY_ARN])
    cache = aws_encryption_sdk.LocalCryptoMaterialsCache(LOCAL_CACHE)
    caching_cmm = aws_encryption_sdk.CachingCryptoMaterialsManager(
        master_key_provider=key_provider,
        cache=cache,
        max_age=MAX_AGE_IN_SECONDS,
        max_messages_encrypted=MAX_ENTRY_MESSAGES,
    )
    client = aws_encryption_sdk.EncryptionSDKClient(
        commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT
    )
    
    # Add an additional column to the Pandas DataFrame
    partition_df = partition_df.assign(
        bookingDict=partition_df["data"].apply(
            lambda x: _decrypt_string(x, client, caching_cmm)
        )
    
    # Convert the Pandas DataFrame back to an iterator
    for row in partition_df.itertuples(index=False):
        yield tuple(row)
        
# Apply the function to each partition
result_rdd = df.rdd.mapPartitions(decrypt_on_partition)

# Collect the results back to the driver node (for demonstration purposes)
result_list = result_rdd.collect()

# Print the result
print(result_list)

# Stop the Spark session
spark.stop()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Why?&lt;/P&gt;&lt;DIV class=""&gt;Because with this code I send the caching service and encryption client to each node and run it on each partition of the DataFrame. Also, t&lt;SPAN&gt;he decryption service can only be accessed from a Spark cluster with the&amp;nbsp; appropriate AWS IAM role associated. And the cacher&amp;nbsp;&lt;/SPAN&gt;used by the decryption service (to speed up decryption and save on costs) can't be run in Spark (that's why I convert the stream of data to a pandas.DataFrame).&lt;/DIV&gt;&lt;DIV class=""&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV class=""&gt;Benefits:&lt;/DIV&gt;&lt;OL class=""&gt;&lt;LI&gt;It runs 4 times faster (4 nodes in this cluster). All integ data decrypted in 30 seconds.&lt;/LI&gt;&lt;LI&gt;I should be able to decrypt the data from my local computer if connected to the appropriate cluster and develop the ML Model locally.&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;When running it in a Databricks notebook it works, however, locally I get this error:&lt;BR /&gt;&lt;BR /&gt;&lt;STRONG&gt;[NOT_IMPLEMENTED] rdd() is not implemented.&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;I also tried to use the spark.DataFrame api without success, doing this,&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;df = df.foreachPartition(decrypt_on_partition)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Instead of,&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;result_rdd = df.rdd.mapPartitions(decrypt_on_partition)
result_list = result_rdd.collect()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Thanks a lot&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 24 Jan 2024 22:18:31 GMT</pubDate>
      <guid>https://community.databricks.com/t5/machine-learning/error-when-accessing-rdd-of-dataframe/m-p/58367#M2905</guid>
      <dc:creator>pablobd</dc:creator>
      <dc:date>2024-01-24T22:18:31Z</dc:date>
    </item>
    <item>
      <title>Re: Error when accessing rdd of DataFrame</title>
      <link>https://community.databricks.com/t5/machine-learning/error-when-accessing-rdd-of-dataframe/m-p/58368#M2906</link>
      <description>&lt;P&gt;I tried to edit the previous one but I can't. Just wanted to remove some lines of code that may be misleading. At the beginning this block should be out,&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql import SparkSession
import pandas as pd

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Sample data
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3), ("David", 4), ("Eva", 5), ("Frank", 6)]
columns = ["Name", "Value"]
df = spark.createDataFrame(data, columns)&lt;/LI-CODE&gt;&lt;P&gt;And the last two lines are unnecessary,&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;# Print the result
print(result_list)

# Stop the Spark session
spark.stop()&lt;/LI-CODE&gt;&lt;P&gt;Thanks again&lt;/P&gt;</description>
      <pubDate>Wed, 24 Jan 2024 22:23:09 GMT</pubDate>
      <guid>https://community.databricks.com/t5/machine-learning/error-when-accessing-rdd-of-dataframe/m-p/58368#M2906</guid>
      <dc:creator>pablobd</dc:creator>
      <dc:date>2024-01-24T22:23:09Z</dc:date>
    </item>
    <item>
      <title>Re: Error when accessing rdd of DataFrame</title>
      <link>https://community.databricks.com/t5/machine-learning/error-when-accessing-rdd-of-dataframe/m-p/58371#M2907</link>
      <description>&lt;P&gt;Sorry, again updating about the problem. Using RDDs is among the listed limitations of Databricks Connect for Python:&amp;nbsp;&lt;A href="https://docs.databricks.com/en/dev-tools/databricks-connect/python/limitations.html" target="_blank"&gt;https://docs.databricks.com/en/dev-tools/databricks-connect/python/limitations.html&lt;/A&gt;&lt;BR /&gt;&lt;BR /&gt;Do you know if there is any workaround? Or any way to achieve this?&lt;BR /&gt;Thanks&lt;/P&gt;</description>
      <pubDate>Wed, 24 Jan 2024 23:11:01 GMT</pubDate>
      <guid>https://community.databricks.com/t5/machine-learning/error-when-accessing-rdd-of-dataframe/m-p/58371#M2907</guid>
      <dc:creator>pablobd</dc:creator>
      <dc:date>2024-01-24T23:11:01Z</dc:date>
    </item>
    <item>
      <title>Re: Error when accessing rdd of DataFrame</title>
      <link>https://community.databricks.com/t5/machine-learning/error-when-accessing-rdd-of-dataframe/m-p/58607#M2915</link>
      <description>&lt;P&gt;I found a good solution that works both locally and in the cloud. Copy pasting the code in case it helps someone.&lt;/P&gt;&lt;P&gt;This is the higher level function in charge of partitioning the data and sending the data and the function fn to each node.&amp;nbsp;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;def decrypt_data(df: SparkDataFrame) -&amp;gt; SparkDataFrame:
    """Decrypts the data by partitioning and sending to each Spark node a pandas
    DataFrame. Note that the GroupBy does nothing but it's necessary with the
    current Spark DataFrame API.

    Parameters
    ----------
    df : SparkDataFrame
        The initial Spark DataFrame

    Returns
    -------
    SparkDataFrame
        The decrypted Spark DataFrame
    """
    # import here to avoid error in databricks - NameError: name 'spark' is not defined
    from databricks.sdk.runtime import dbutils

    KEY_ARN = dbutils.secrets.get(
        scope=f"mlc_{Cts.ENV}", key=f"KEY_ARN_{Cts.ENV.upper()}"
    )
    fn = decrypt_on_partition(key_arn=KEY_ARN)
    return df.groupBy().applyInPandas(fn, schema=get_schema())&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;See that decrypt_on_partition returns a function with they key embedded into it:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;def decrypt_on_partition(key_arn: str) -&amp;gt; Callable[[pd.DataFrame], pd.DataFrame]:
    """This function is just a wrapper around the actual function used in each Spark
    partition and it's used to embedd the ARN KEY into each Spark node in a clean way.

    Parameters
    ----------
    key_arn : str
        The key

    Returns
    -------
    Callable[[pd.DataFrame], pd.DataFrame]
        The function that runs on each partition
    """

    def _decrypt_on_partition(partition_df: pd.DataFrame) -&amp;gt; pd.DataFrame:
        """The decrypt function called on each DataFrame partition. Note that it's only
        in this function that makes sense to instantiate the decryption client and
        decryption cache, so that each Spark node has its own client and cache (it
        can't be share at the cluster level).

        Parameters
        ----------
        partition_df : pd.DataFrame
            The partitioned data as a pandas DataFrame

        Returns
        -------
        pd.DataFrame
            The resulting pandas DataFrame
        """
        key_prov = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(key_ids=[key_arn])
        cache = aws_encryption_sdk.LocalCryptoMaterialsCache(LOCAL_CACHE)
        caching_cmm = aws_encryption_sdk.CachingCryptoMaterialsManager(
            master_key_provider=key_prov,
            cache=cache,
            max_age=MAX_AGE_IN_SECONDS,
            max_messages_encrypted=MAX_ENTRY_MESSAGES,
        )
        client = EncryptionSDKClient(
            commitment_policy=aws_encryption_sdk.CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT
        )

        partition_df = (
            partition_df.assign(
                data=partition_df["data"].apply(
                    lambda x: _decrypt_string(x, client, caching_cmm)
                )
            )
        )

        return partition_df

    return _decrypt_on_partition&lt;/LI-CODE&gt;&lt;P&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 29 Jan 2024 12:12:52 GMT</pubDate>
      <guid>https://community.databricks.com/t5/machine-learning/error-when-accessing-rdd-of-dataframe/m-p/58607#M2915</guid>
      <dc:creator>pablobd</dc:creator>
      <dc:date>2024-01-29T12:12:52Z</dc:date>
    </item>
    <item>
      <title>Re: Error when accessing rdd of DataFrame</title>
      <link>https://community.databricks.com/t5/machine-learning/error-when-accessing-rdd-of-dataframe/m-p/58609#M2917</link>
      <description>&lt;P&gt;Thank you for sharing!&lt;/P&gt;</description>
      <pubDate>Mon, 29 Jan 2024 13:11:37 GMT</pubDate>
      <guid>https://community.databricks.com/t5/machine-learning/error-when-accessing-rdd-of-dataframe/m-p/58609#M2917</guid>
      <dc:creator>Lakshay</dc:creator>
      <dc:date>2024-01-29T13:11:37Z</dc:date>
    </item>
  </channel>
</rss>

