I have a getS3Object function to get (json) objects located in aws s3  
object client_connect extends Serializable {
     val s3_get_path = "/dbfs/mnt/s3response"  
     def getS3Objects(s3ObjectName: String, s3Client: AmazonS3): String = {
       val objectKey = s"${s3ObjectName}"
       val inputS3Stream = s3Client.getObject("myS3Bucket", objectKey).getObjectContent
       val inputS3String = IOUtils.toString(inputS3Stream, "UTF-8")
       val filePath = s"${s3_get_path}/${objectKey}"
       val file = new File(filePath)
       val fileWriter = new FileWriter(file)
       val bw = new BufferedWriter(fileWriter)
       bw.write(inputS3String)
       bw.close()
       fileWriter.close()
       inputS3String
     } 
    }
Messages are ingested using a streaming framework
The source streaming dataframe 'source_df' which reads from azure event hub looks like below
|body| 
|8c44f2715ab81c16ecb31d527e18465d.json~2021-05-26~13-14-56~OH| 
|a4f9e914c1a40e5828b0eb129b1234b2.json~2022=05-09~15-12-22~MI|
The 'body' column contains string values delimited by ‘~’ where the first element is the object id that is passed as a parameter into the getS3Object function
The second parameter to this function is the S3client used to connect to aws S3 which is defined inside a serializable class.
final class s3clientBuild() extends Serializable {
   def s3connection(AccessKey: String, SecretKey: String) = {
    val clientRegion: Regions = Regions.US_EAST_1
    val creds = new BasicAWSCredentials(AccessKey, SecretKey)
    AmazonS3ClientBuilder.standard()
   .withRegion(clientRegion)
   .withCredentials(new AWSStaticCredentialsProvider(creds))
   .build()  
   }
  }
  val AccessKey = dbutils.secrets.get(scope = "Scope", key = "AccessKey-ID")
  val SecretKey = dbutils.secrets.get(scope = "Scope", key = "AccessKey-Secret")
Write Stream:-
val streamWriter = source_df 
   .writeStream
   .queryName("Write_stream")
   .option("checkpointLocation", chk_pt)
   .trigger(Trigger.ProcessingTime(3 seconds))
   .outputMode("append")
   .foreachBatch(
   (batchDF: DataFrame, batchId: Long) => Try {
 
     batchDF.persist()
 
       val object_df = batchDF.select(split(col("body"), "~").getItem(0).as("ObjectID"))
 
       val df_response = object_df.repartition(2).mapPartitions(iterator => {
       val api_connect = new s3clientBuild()
       val s3client = api_connect.s3connection(AccessKey, SecretKey)
       val resp = iterator.map(row => {
       val name = cli_connector.getS3Objects(row.getString(0), s3client) 
      (name)
        })
     resp
      }).toDF("value").select(from_json($"value".cast("string"), MySchema) as "fields").select($"fields.*")
 
   df_response.count()
 
   batchDF.unpersist()
 } match {
   case Success(_) =>
   case Failure(e) => {throw e}
      } 
   )
 
  
However I get the below error message:-
Task not serializable: java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter
 
  Serialization stack:
 
  	- object not serializable (class: org.apache.spark.sql.streaming.DataStreamWriter, value: org.apache.spark.sql.streaming.DataStreamWriter@1f1c5f4f)
 
  	- field (class: $lineeabf6de089d548a29e8a43ad48edbc49125.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: sensorwriter_0526, type: class org.apache.spark.sql.streaming.DataStreamWriter)
 
  	- object (class $lineeabf6de089d548a29e8a43ad48edbc49125.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, $lineeabf6de089d548a29e8a43ad48edbc49125.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@41c5f54f)
 
  	- element of array (index: 0)
 
  	- array (class [Ljava.lang.Object;, size 1)
 
  	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
 
  	- object (class java.lang.invoke.SerializedLambda)
How can this be resolved?