I have a getS3Object function to get (json) objects located in aws s3
object client_connect extends Serializable {
val s3_get_path = "/dbfs/mnt/s3response"
def getS3Objects(s3ObjectName: String, s3Client: AmazonS3): String = {
val objectKey = s"${s3ObjectName}"
val inputS3Stream = s3Client.getObject("myS3Bucket", objectKey).getObjectContent
val inputS3String = IOUtils.toString(inputS3Stream, "UTF-8")
val filePath = s"${s3_get_path}/${objectKey}"
val file = new File(filePath)
val fileWriter = new FileWriter(file)
val bw = new BufferedWriter(fileWriter)
bw.write(inputS3String)
bw.close()
fileWriter.close()
inputS3String
}
}
Messages are ingested using a streaming framework
The source streaming dataframe 'source_df' which reads from azure event hub looks like below
|body|
|8c44f2715ab81c16ecb31d527e18465d.json~2021-05-26~13-14-56~OH|
|a4f9e914c1a40e5828b0eb129b1234b2.json~2022=05-09~15-12-22~MI|
The 'body' column contains string values delimited by ‘~’ where the first element is the object id that is passed as a parameter into the getS3Object function
The second parameter to this function is the S3client used to connect to aws S3 which is defined inside a serializable class.
final class s3clientBuild() extends Serializable {
def s3connection(AccessKey: String, SecretKey: String) = {
val clientRegion: Regions = Regions.US_EAST_1
val creds = new BasicAWSCredentials(AccessKey, SecretKey)
AmazonS3ClientBuilder.standard()
.withRegion(clientRegion)
.withCredentials(new AWSStaticCredentialsProvider(creds))
.build()
}
}
val AccessKey = dbutils.secrets.get(scope = "Scope", key = "AccessKey-ID")
val SecretKey = dbutils.secrets.get(scope = "Scope", key = "AccessKey-Secret")
Write Stream:-
val streamWriter = source_df
.writeStream
.queryName("Write_stream")
.option("checkpointLocation", chk_pt)
.trigger(Trigger.ProcessingTime(3 seconds))
.outputMode("append")
.foreachBatch(
(batchDF: DataFrame, batchId: Long) => Try {
batchDF.persist()
val object_df = batchDF.select(split(col("body"), "~").getItem(0).as("ObjectID"))
val df_response = object_df.repartition(2).mapPartitions(iterator => {
val api_connect = new s3clientBuild()
val s3client = api_connect.s3connection(AccessKey, SecretKey)
val resp = iterator.map(row => {
val name = cli_connector.getS3Objects(row.getString(0), s3client)
(name)
})
resp
}).toDF("value").select(from_json($"value".cast("string"), MySchema) as "fields").select($"fields.*")
df_response.count()
batchDF.unpersist()
} match {
case Success(_) =>
case Failure(e) => {throw e}
}
)
However I get the below error message:-
Task not serializable: java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter
Serialization stack:
- object not serializable (class: org.apache.spark.sql.streaming.DataStreamWriter, value: org.apache.spark.sql.streaming.DataStreamWriter@1f1c5f4f)
- field (class: $lineeabf6de089d548a29e8a43ad48edbc49125.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: sensorwriter_0526, type: class org.apache.spark.sql.streaming.DataStreamWriter)
- object (class $lineeabf6de089d548a29e8a43ad48edbc49125.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, $lineeabf6de089d548a29e8a43ad48edbc49125.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@41c5f54f)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda)
How can this be resolved?