cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Task not serializable: java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter

Sandesh87
New Contributor III

I have a getS3Object function to get (json) objects located in aws s3  

object client_connect extends Serializable {
     val s3_get_path = "/dbfs/mnt/s3response"  
     def getS3Objects(s3ObjectName: String, s3Client: AmazonS3): String = {
       val objectKey = s"${s3ObjectName}"
       val inputS3Stream = s3Client.getObject("myS3Bucket", objectKey).getObjectContent
       val inputS3String = IOUtils.toString(inputS3Stream, "UTF-8")
       val filePath = s"${s3_get_path}/${objectKey}"
       val file = new File(filePath)
       val fileWriter = new FileWriter(file)
       val bw = new BufferedWriter(fileWriter)
       bw.write(inputS3String)
       bw.close()
       fileWriter.close()
       inputS3String
     } 
    }

Messages are ingested using a streaming framework

The source streaming dataframe 'source_df' which reads from azure event hub looks like below

|body| 

|8c44f2715ab81c16ecb31d527e18465d.json~2021-05-26~13-14-56~OH| 

|a4f9e914c1a40e5828b0eb129b1234b2.json~2022=05-09~15-12-22~MI|

The 'body' column contains string values delimited by ‘~’ where the first element is the object id that is passed as a parameter into the getS3Object function

The second parameter to this function is the S3client used to connect to aws S3 which is defined inside a serializable class.

final class s3clientBuild() extends Serializable {
   def s3connection(AccessKey: String, SecretKey: String) = {
    val clientRegion: Regions = Regions.US_EAST_1
    val creds = new BasicAWSCredentials(AccessKey, SecretKey)
    AmazonS3ClientBuilder.standard()
   .withRegion(clientRegion)
   .withCredentials(new AWSStaticCredentialsProvider(creds))
   .build()  
   }
  }

  val AccessKey = dbutils.secrets.get(scope = "Scope", key = "AccessKey-ID")

  val SecretKey = dbutils.secrets.get(scope = "Scope", key = "AccessKey-Secret")

Write Stream:-

val streamWriter = source_df 
   .writeStream
   .queryName("Write_stream")
   .option("checkpointLocation", chk_pt)
   .trigger(Trigger.ProcessingTime(3 seconds))
   .outputMode("append")
   .foreachBatch(
   (batchDF: DataFrame, batchId: Long) => Try {
 
     batchDF.persist()
 
       val object_df = batchDF.select(split(col("body"), "~").getItem(0).as("ObjectID"))
 
       val df_response = object_df.repartition(2).mapPartitions(iterator => {
       val api_connect = new s3clientBuild()
       val s3client = api_connect.s3connection(AccessKey, SecretKey)
       val resp = iterator.map(row => {
       val name = cli_connector.getS3Objects(row.getString(0), s3client) 
      (name)
        })
     resp
      }).toDF("value").select(from_json($"value".cast("string"), MySchema) as "fields").select($"fields.*")
 
   df_response.count()
 
   batchDF.unpersist()
 } match {
   case Success(_) =>
   case Failure(e) => {throw e}
      } 
   )
 

  

However I get the below error message:-

Task not serializable: java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter
 
  Serialization stack:
 
  	- object not serializable (class: org.apache.spark.sql.streaming.DataStreamWriter, value: org.apache.spark.sql.streaming.DataStreamWriter@1f1c5f4f)
 
  	- field (class: $lineeabf6de089d548a29e8a43ad48edbc49125.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: sensorwriter_0526, type: class org.apache.spark.sql.streaming.DataStreamWriter)
 
  	- object (class $lineeabf6de089d548a29e8a43ad48edbc49125.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, $lineeabf6de089d548a29e8a43ad48edbc49125.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@41c5f54f)
 
  	- element of array (index: 0)
 
  	- array (class [Ljava.lang.Object;, size 1)
 
  	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
 
  	- object (class java.lang.invoke.SerializedLambda)

How can this be resolved?

3 REPLIES 3

-werners-
Esteemed Contributor III

@Sandesh Puligundla​ , Hm hard to tell, but I am pretty sure it is because of the objects you wrote.

Keep in mind that spark is distributed.

You might wanna check out these links:

https://www.placeiq.com/2017/11/how-to-solve-non-serializable-errors-when-instantiating-objects-in-s...

https://stackoverflow.com/questions/40596871/how-spark-handles-object

Not an actual answer to your question, sorry. But this error is hard to pinpoint (for me anyway, hopefully some good coders can solve this).

Thank you, Werner. You are perfectly right! I had a similar errors but didn't figure out why, and the placeIq article you shared solved it. It is indeed because my objects and by adding `extends Serializable` solved the error successfully

Anonymous
Not applicable

Hey there @Sandesh Puligundla​ 

Hope all is well!

Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help. 

We'd love to hear from you.

Thanks!

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.