05-27-2022 12:39 PM
I have a getS3Object function to get (json) objects located in aws s3
object client_connect extends Serializable {
val s3_get_path = "/dbfs/mnt/s3response"
def getS3Objects(s3ObjectName: String, s3Client: AmazonS3): String = {
val objectKey = s"${s3ObjectName}"
val inputS3Stream = s3Client.getObject("myS3Bucket", objectKey).getObjectContent
val inputS3String = IOUtils.toString(inputS3Stream, "UTF-8")
val filePath = s"${s3_get_path}/${objectKey}"
val file = new File(filePath)
val fileWriter = new FileWriter(file)
val bw = new BufferedWriter(fileWriter)
bw.write(inputS3String)
bw.close()
fileWriter.close()
inputS3String
}
}
Messages are ingested using a streaming framework
The source streaming dataframe 'source_df' which reads from azure event hub looks like below
|body|
|8c44f2715ab81c16ecb31d527e18465d.json~2021-05-26~13-14-56~OH|
|a4f9e914c1a40e5828b0eb129b1234b2.json~2022=05-09~15-12-22~MI|
The 'body' column contains string values delimited by ‘~’ where the first element is the object id that is passed as a parameter into the getS3Object function
The second parameter to this function is the S3client used to connect to aws S3 which is defined inside a serializable class.
final class s3clientBuild() extends Serializable {
def s3connection(AccessKey: String, SecretKey: String) = {
val clientRegion: Regions = Regions.US_EAST_1
val creds = new BasicAWSCredentials(AccessKey, SecretKey)
AmazonS3ClientBuilder.standard()
.withRegion(clientRegion)
.withCredentials(new AWSStaticCredentialsProvider(creds))
.build()
}
}
val AccessKey = dbutils.secrets.get(scope = "Scope", key = "AccessKey-ID")
val SecretKey = dbutils.secrets.get(scope = "Scope", key = "AccessKey-Secret")
Write Stream:-
val streamWriter = source_df
.writeStream
.queryName("Write_stream")
.option("checkpointLocation", chk_pt)
.trigger(Trigger.ProcessingTime(3 seconds))
.outputMode("append")
.foreachBatch(
(batchDF: DataFrame, batchId: Long) => Try {
batchDF.persist()
val object_df = batchDF.select(split(col("body"), "~").getItem(0).as("ObjectID"))
val df_response = object_df.repartition(2).mapPartitions(iterator => {
val api_connect = new s3clientBuild()
val s3client = api_connect.s3connection(AccessKey, SecretKey)
val resp = iterator.map(row => {
val name = cli_connector.getS3Objects(row.getString(0), s3client)
(name)
})
resp
}).toDF("value").select(from_json($"value".cast("string"), MySchema) as "fields").select($"fields.*")
df_response.count()
batchDF.unpersist()
} match {
case Success(_) =>
case Failure(e) => {throw e}
}
)
However I get the below error message:-
Task not serializable: java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter
Serialization stack:
- object not serializable (class: org.apache.spark.sql.streaming.DataStreamWriter, value: org.apache.spark.sql.streaming.DataStreamWriter@1f1c5f4f)
- field (class: $lineeabf6de089d548a29e8a43ad48edbc49125.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: sensorwriter_0526, type: class org.apache.spark.sql.streaming.DataStreamWriter)
- object (class $lineeabf6de089d548a29e8a43ad48edbc49125.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, $lineeabf6de089d548a29e8a43ad48edbc49125.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@41c5f54f)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda)
How can this be resolved?
05-30-2022 01:00 AM
@Sandesh Puligundla , Hm hard to tell, but I am pretty sure it is because of the objects you wrote.
Keep in mind that spark is distributed.
You might wanna check out these links:
https://stackoverflow.com/questions/40596871/how-spark-handles-object
Not an actual answer to your question, sorry. But this error is hard to pinpoint (for me anyway, hopefully some good coders can solve this).
04-11-2024 11:22 AM
Thank you, Werner. You are perfectly right! I had a similar errors but didn't figure out why, and the placeIq article you shared solved it. It is indeed because my objects and by adding `extends Serializable` solved the error successfully
07-28-2022 10:13 AM
Hey there @Sandesh Puligundla
Hope all is well!
Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help.
We'd love to hear from you.
Thanks!
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group