<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic UDF LLM DataBrick pickle error in Machine Learning</title>
    <link>https://community.databricks.com/t5/machine-learning/udf-llm-databrick-pickle-error/m-p/98370#M3780</link>
    <description>&lt;P&gt;Hi there,&lt;/P&gt;&lt;P&gt;I am trying to parellize a text extraction via the Databrick foundational model.&lt;/P&gt;&lt;P&gt;Any pointers to suggestions or examples are welcome&lt;/P&gt;&lt;P&gt;The code and error below.&lt;/P&gt;&lt;LI-CODE lang="python"&gt;model = "databricks-meta-llama-3-1-70b-instruct"
temperature=0.0
max_tokens=1024

schema_llm = StructType([
    StructField("contains_vulnerability", BooleanType(), True),
])

chat_model = ChatDatabricks(
            endpoint=model,
            temperature=temperature,
            max_tokens=max_tokens
        )

chain_llm: LLMChain = (chat_prompt | chat_model.with_structured_output(VulnerabilityReport))

@udf(returnType=schema_llm) 
def CheckContent(text:str): 
    out = chain_llm.invoke({"content":text})
    return (out["contains_vulnerability"])
    
expand_df = sample_df.withColumn("content_check", CheckContent("file_content"))
display(expand_df)&amp;lt;div&amp;gt;&amp;lt;span&amp;gt;And I am getting a pickle error:&amp;lt;div&amp;gt; &amp;lt;li-code lang="markup"&amp;gt;Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 559, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/core/context.py", line 525, in __getnewargs__
    raise PySparkRuntimeError(
pyspark.errors.exceptions.base.PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Mon, 11 Nov 2024 19:34:53 GMT</pubDate>
    <dc:creator>llmnerd</dc:creator>
    <dc:date>2024-11-11T19:34:53Z</dc:date>
    <item>
      <title>UDF LLM DataBrick pickle error</title>
      <link>https://community.databricks.com/t5/machine-learning/udf-llm-databrick-pickle-error/m-p/98370#M3780</link>
      <description>&lt;P&gt;Hi there,&lt;/P&gt;&lt;P&gt;I am trying to parellize a text extraction via the Databrick foundational model.&lt;/P&gt;&lt;P&gt;Any pointers to suggestions or examples are welcome&lt;/P&gt;&lt;P&gt;The code and error below.&lt;/P&gt;&lt;LI-CODE lang="python"&gt;model = "databricks-meta-llama-3-1-70b-instruct"
temperature=0.0
max_tokens=1024

schema_llm = StructType([
    StructField("contains_vulnerability", BooleanType(), True),
])

chat_model = ChatDatabricks(
            endpoint=model,
            temperature=temperature,
            max_tokens=max_tokens
        )

chain_llm: LLMChain = (chat_prompt | chat_model.with_structured_output(VulnerabilityReport))

@udf(returnType=schema_llm) 
def CheckContent(text:str): 
    out = chain_llm.invoke({"content":text})
    return (out["contains_vulnerability"])
    
expand_df = sample_df.withColumn("content_check", CheckContent("file_content"))
display(expand_df)&amp;lt;div&amp;gt;&amp;lt;span&amp;gt;And I am getting a pickle error:&amp;lt;div&amp;gt; &amp;lt;li-code lang="markup"&amp;gt;Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 559, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/core/context.py", line 525, in __getnewargs__
    raise PySparkRuntimeError(
pyspark.errors.exceptions.base.PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 11 Nov 2024 19:34:53 GMT</pubDate>
      <guid>https://community.databricks.com/t5/machine-learning/udf-llm-databrick-pickle-error/m-p/98370#M3780</guid>
      <dc:creator>llmnerd</dc:creator>
      <dc:date>2024-11-11T19:34:53Z</dc:date>
    </item>
    <item>
      <title>Re: UDF LLM DataBrick pickle error</title>
      <link>https://community.databricks.com/t5/machine-learning/udf-llm-databrick-pickle-error/m-p/104351#M3884</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/132004"&gt;@llmnerd&lt;/a&gt;&amp;nbsp;, Hope you are doing well!&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;Upon reviewing the details provided, we have identified several observations regarding the SparkContext serialization error encountered. Please find a detailed analysis and our recommendations below:&lt;/SPAN&gt;&lt;BR /&gt;&lt;STRONG&gt;==== ANALYSIS ====&lt;/STRONG&gt;&lt;BR /&gt;&lt;STRONG&gt;Error Encountered:&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;An error occurred indicating that the SparkContext object could not be serialized. This typically occurs when SparkContext is referenced from a broadcast variable, action, or transformation, which is only permissible on the driver and not on the worker nodes.&lt;/SPAN&gt;&lt;BR /&gt;&lt;STRONG&gt;Analysis of the Problematic Code:&lt;/STRONG&gt;&lt;BR /&gt;&lt;STRONG&gt;1.Broadcast Variable Initialization:&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;broadcast_var = spark.sparkContext.broadcast([cloudTrailSchema, parquetOutputPath])&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;This line attempts to broadcast cloudTrailSchema and parquetOutputPath to all worker nodes, which is a valid approach for making configuration data available cluster-wide.&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;STRONG&gt;2. RDD Creation:&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;rdd = spark.sparkContext.parallelize([cloudTrailSchema, parquetOutputPath])&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;Here, the intent seems to be to distribute these objects for parallel processing, which is conceptually incorrect. Instead, creating an RDD of actual file paths would be appropriate: rdd = spark.sparkContext.parallelize(file_paths)&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;STRONG&gt;3.Data Processing:&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;result = rdd.mapPartitions(process_partition).collect()&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;This line processes the RDD created in line 2 using the mapPartitions method, which is intended to apply a function to each partition of the RDD. The function process_partition attempts to process data using the broadcast variables.&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;STRONG&gt;Proposed Correction:&lt;/STRONG&gt;&lt;BR /&gt;&lt;SPAN&gt;def process_partition(iterator):&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;broadcasted_values = broadcast_var.value&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp;&amp;nbsp; schema, output_path = broadcasted_values&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp;&amp;nbsp; for file_path in iterator:&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; process_file(file_path, schema, output_path)&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;This function calls process_file in which the sparkContext is being used:&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;df = spark.read.schema(cloudTrailSchema).json(file_path)&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;This is not a valid approach as&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;SparkContext can only be used on the driver node and cannot be serialized or accessed on the worker nodes.&lt;/STRONG&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;STRONG&gt;== Root Cause Analysis ==&lt;/STRONG&gt;&lt;BR /&gt;&lt;SPAN&gt;Referencing SparkContext within actions or transformations leads to serialization errors, as these operations execute on worker nodes where SparkContext is unavailable.&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;STRONG&gt;===== Solution ======&lt;/STRONG&gt;&lt;BR /&gt;&lt;SPAN&gt;Revise the process_file function to avoid SparkContext access on the workers. Consider using Python’s ThreadPool Executor for achieving concurrency, which does not involve SparkContext operations on worker nodes.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P class="p1"&gt;Please let me know if this helps and leave a like if this information is useful, followups are appreciated.&lt;/P&gt;
&lt;P class="p1"&gt;Kudos&lt;/P&gt;
&lt;P class="p1"&gt;Ayushi&lt;/P&gt;</description>
      <pubDate>Mon, 06 Jan 2025 13:48:19 GMT</pubDate>
      <guid>https://community.databricks.com/t5/machine-learning/udf-llm-databrick-pickle-error/m-p/104351#M3884</guid>
      <dc:creator>Ayushi_Suthar</dc:creator>
      <dc:date>2025-01-06T13:48:19Z</dc:date>
    </item>
  </channel>
</rss>

