<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStre in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/task-not-serializable-java-io-notserializableexception-org/m-p/66087#M33013</link>
    <description>&lt;P&gt;Thank you, Werner. You are perfectly right! I had a similar errors but didn't figure out why, and the placeIq article you shared solved it. It is indeed because my objects and by adding `extends Serializable` solved the error successfully&lt;/P&gt;</description>
    <pubDate>Thu, 11 Apr 2024 18:22:43 GMT</pubDate>
    <dc:creator>Jinfei</dc:creator>
    <dc:date>2024-04-11T18:22:43Z</dc:date>
    <item>
      <title>Task not serializable: java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter</title>
      <link>https://community.databricks.com/t5/data-engineering/task-not-serializable-java-io-notserializableexception-org/m-p/19332#M12940</link>
      <description>&lt;P&gt;I have a getS3Object function to get (json) objects located in aws s3&amp;nbsp;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;object client_connect extends Serializable {
     val s3_get_path = "/dbfs/mnt/s3response"  
     def getS3Objects(s3ObjectName: String, s3Client: AmazonS3): String = {
       val objectKey = s"${s3ObjectName}"
       val inputS3Stream = s3Client.getObject("myS3Bucket", objectKey).getObjectContent
       val inputS3String = IOUtils.toString(inputS3Stream, "UTF-8")
       val filePath = s"${s3_get_path}/${objectKey}"
       val file = new File(filePath)
       val fileWriter = new FileWriter(file)
       val bw = new BufferedWriter(fileWriter)
       bw.write(inputS3String)
       bw.close()
       fileWriter.close()
       inputS3String
     } 
    }&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;Messages are ingested using a streaming framework&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;The source streaming dataframe 'source_df' which reads from azure event hub looks like below&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;|body|&amp;nbsp;&lt;/P&gt;&lt;P&gt;|8c44f2715ab81c16ecb31d527e18465d.json~2021-05-26~13-14-56~OH|&amp;nbsp;&lt;/P&gt;&lt;P&gt;|a4f9e914c1a40e5828b0eb129b1234b2.json~2022=05-09~15-12-22~MI|&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;The 'body' column contains string values delimited by ‘~’ where the first element is the object id that is passed as a parameter into the getS3Object function&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;The second parameter to this function is the S3client used to connect to aws S3 which is defined inside a serializable class.&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;final class s3clientBuild() extends Serializable {
   def s3connection(AccessKey: String, SecretKey: String) = {
    val clientRegion: Regions = Regions.US_EAST_1
    val creds = new BasicAWSCredentials(AccessKey, SecretKey)
    AmazonS3ClientBuilder.standard()
   .withRegion(clientRegion)
   .withCredentials(new AWSStaticCredentialsProvider(creds))
   .build()  
   }
  }&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;val AccessKey = dbutils.secrets.get(scope = "Scope", key = "AccessKey-ID")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;val SecretKey = dbutils.secrets.get(scope = "Scope", key = "AccessKey-Secret")&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Write Stream:-&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;val streamWriter = source_df 
   .writeStream
   .queryName("Write_stream")
   .option("checkpointLocation", chk_pt)
   .trigger(Trigger.ProcessingTime(3 seconds))
   .outputMode("append")
   .foreachBatch(
   (batchDF: DataFrame, batchId: Long) =&amp;gt; Try {
&amp;nbsp;
     batchDF.persist()
&amp;nbsp;
       val object_df = batchDF.select(split(col("body"), "~").getItem(0).as("ObjectID"))
&amp;nbsp;
       val df_response = object_df.repartition(2).mapPartitions(iterator =&amp;gt; {
       val api_connect = new s3clientBuild()
       val s3client = api_connect.s3connection(AccessKey, SecretKey)
       val resp = iterator.map(row =&amp;gt; {
       val name = cli_connector.getS3Objects(row.getString(0), s3client) 
      (name)
        })
     resp
      }).toDF("value").select(from_json($"value".cast("string"), MySchema) as "fields").select($"fields.*")
&amp;nbsp;
   df_response.count()
&amp;nbsp;
   batchDF.unpersist()
 } match {
   case Success(_) =&amp;gt;
   case Failure(e) =&amp;gt; {throw e}
      } 
   )
&amp;nbsp;&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&lt;/P&gt;&lt;P&gt;However I get the below error message:-&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;Task not serializable: java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter
&amp;nbsp;
  Serialization stack:
&amp;nbsp;
  	- object not serializable (class: org.apache.spark.sql.streaming.DataStreamWriter, value: org.apache.spark.sql.streaming.DataStreamWriter@1f1c5f4f)
&amp;nbsp;
  	- field (class: $lineeabf6de089d548a29e8a43ad48edbc49125.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: sensorwriter_0526, type: class org.apache.spark.sql.streaming.DataStreamWriter)
&amp;nbsp;
  	- object (class $lineeabf6de089d548a29e8a43ad48edbc49125.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, $lineeabf6de089d548a29e8a43ad48edbc49125.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@41c5f54f)
&amp;nbsp;
  	- element of array (index: 0)
&amp;nbsp;
  	- array (class [Ljava.lang.Object;, size 1)
&amp;nbsp;
  	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
&amp;nbsp;
  	- object (class java.lang.invoke.SerializedLambda)&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;How can this be resolved?&lt;/P&gt;</description>
      <pubDate>Fri, 27 May 2022 19:39:10 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/task-not-serializable-java-io-notserializableexception-org/m-p/19332#M12940</guid>
      <dc:creator>Sandesh87</dc:creator>
      <dc:date>2022-05-27T19:39:10Z</dc:date>
    </item>
    <item>
      <title>Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter</title>
      <link>https://community.databricks.com/t5/data-engineering/task-not-serializable-java-io-notserializableexception-org/m-p/19333#M12941</link>
      <description>&lt;P&gt;@Sandesh Puligundla​&amp;nbsp;, Hm hard to tell, but I am pretty sure it is because of the objects you wrote.&lt;/P&gt;&lt;P&gt;Keep in mind that spark is distributed.&lt;/P&gt;&lt;P&gt;You might wanna check out these links:&lt;/P&gt;&lt;P&gt;&lt;A href="https://www.placeiq.com/2017/11/how-to-solve-non-serializable-errors-when-instantiating-objects-in-spark-udfs/" alt="https://www.placeiq.com/2017/11/how-to-solve-non-serializable-errors-when-instantiating-objects-in-spark-udfs/" target="_blank"&gt;https://www.placeiq.com/2017/11/how-to-solve-non-serializable-errors-when-instantiating-objects-in-spark-udfs/&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&lt;A href="https://stackoverflow.com/questions/40596871/how-spark-handles-object" alt="https://stackoverflow.com/questions/40596871/how-spark-handles-object" target="_blank"&gt;https://stackoverflow.com/questions/40596871/how-spark-handles-object&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Not an actual answer to your question, sorry. But this error is hard to pinpoint (for me anyway, hopefully some good coders can solve this).&lt;/P&gt;</description>
      <pubDate>Mon, 30 May 2022 08:00:11 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/task-not-serializable-java-io-notserializableexception-org/m-p/19333#M12941</guid>
      <dc:creator>-werners-</dc:creator>
      <dc:date>2022-05-30T08:00:11Z</dc:date>
    </item>
    <item>
      <title>Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter</title>
      <link>https://community.databricks.com/t5/data-engineering/task-not-serializable-java-io-notserializableexception-org/m-p/19334#M12942</link>
      <description>&lt;P&gt;Hey there @Sandesh Puligundla​&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Hope all is well! &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help.&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;We'd love to hear from you.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Thanks!&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 28 Jul 2022 17:13:48 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/task-not-serializable-java-io-notserializableexception-org/m-p/19334#M12942</guid>
      <dc:creator>Anonymous</dc:creator>
      <dc:date>2022-07-28T17:13:48Z</dc:date>
    </item>
    <item>
      <title>Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStre</title>
      <link>https://community.databricks.com/t5/data-engineering/task-not-serializable-java-io-notserializableexception-org/m-p/66087#M33013</link>
      <description>&lt;P&gt;Thank you, Werner. You are perfectly right! I had a similar errors but didn't figure out why, and the placeIq article you shared solved it. It is indeed because my objects and by adding `extends Serializable` solved the error successfully&lt;/P&gt;</description>
      <pubDate>Thu, 11 Apr 2024 18:22:43 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/task-not-serializable-java-io-notserializableexception-org/m-p/66087#M33013</guid>
      <dc:creator>Jinfei</dc:creator>
      <dc:date>2024-04-11T18:22:43Z</dc:date>
    </item>
  </channel>
</rss>

