03-08-2022 09:53 AM
I am trying to execute an api call to get an object(json) from amazon s3 and I am using foreachPartition to execute multiple calls in parallel
df.rdd.foreachPartition(partition => {
//Initialize list buffer
var buffer_accounts1 = new ListBuffer[String]()
//Initialize Connection to amazon s3
val s3 = s3clientConnection()
partition.foreach(fun=>{
//api to get object from s3 bucket
//the first column of each row contains s3 object name
val obj = getS3Object(s3, "my_bucket", fun.getString(0)).getContent
val objString = IOUtils.toString(obj, "UTF-8")
buffer_accounts1 += objString
})
buffer_accounts1.toList.toDF("Object").write.parquet("dbfs:/mnt/test")
})
From the foreachPartition I would like to store the string responses from all of the api calls into a single dataframe. So if in my forEachPartition if I make a total of 100 api calls I would like to create one dataframe that has all the 100 responses.
To do this I am creating a mutable list and want to convert it to a dataframe within foreachPartition but we cannot create a dataframe outside of the driver.
I want to create a dataframe with all the responses from the total api calls within the foreachPartition so that I can apply further transformations. How can this be achieved?
Note:- I could write every response to disk as json and read them back in but that results in performance degradation because of a lot of disk I/O operations.
03-08-2022 01:39 PM
It can be achieved using mapPartitions
val df_response = df.mapPartitions(iterator => {
val api_connect = new s3clientBuild()
val s3client = api_connect.s3connection(AccessKey, SecretKey)
val res = iterator.map(row =>{
val name = getS3(row.getString(0), s3client)
(name)
})
res
}).toDF("value")
03-08-2022 01:39 PM
It can be achieved using mapPartitions
val df_response = df.mapPartitions(iterator => {
val api_connect = new s3clientBuild()
val s3client = api_connect.s3connection(AccessKey, SecretKey)
val res = iterator.map(row =>{
val name = getS3(row.getString(0), s3client)
(name)
})
res
}).toDF("value")
04-11-2022 02:08 PM
Hi @Sandesh Puligundla ,
Thank you for sharing the solution. We will mark it as "best" response so, in the future is another user has the same question, they will be able to find the solution right away.
Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections.
Click here to register and join today!
Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.