cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

create a dataframe with all the responses from the api requests within foreachPartition

Sandesh87
New Contributor III

I am trying to execute an api call to get an object(json) from amazon s3 and I am using foreachPartition to execute multiple calls in parallel

df.rdd.foreachPartition(partition => {
  //Initialize list buffer
  var buffer_accounts1 = new ListBuffer[String]()
 
  //Initialize Connection to amazon s3
  val s3 = s3clientConnection()
 
  partition.foreach(fun=>{
   //api to get object from s3 bucket
   //the first column of each row contains s3 object name
    val obj = getS3Object(s3, "my_bucket", fun.getString(0)).getContent
    val objString = IOUtils.toString(obj, "UTF-8")
    buffer_accounts1 += objString 
  })
  buffer_accounts1.toList.toDF("Object").write.parquet("dbfs:/mnt/test")
 })

From the foreachPartition I would like to store the string responses from all of the api calls into a single dataframe. So if in my forEachPartition if I make a total of 100 api calls I would like to create one dataframe that has all the 100 responses.

To do this I am creating a mutable list and want to convert it to a dataframe within foreachPartition  but we cannot create a dataframe outside of the driver.

I want to create a dataframe with all the responses from the total api calls within the foreachPartition so that I can apply further transformations. How can this be achieved?

Note:- I could write every response to disk as json and read them back in but that results in performance degradation because of a lot of disk I/O operations.

1 ACCEPTED SOLUTION

Accepted Solutions

Sandesh87
New Contributor III

It can be achieved using mapPartitions

val df_response = df.mapPartitions(iterator => {
  val api_connect  = new s3clientBuild()
  val s3client = api_connect.s3connection(AccessKey, SecretKey)
  val res = iterator.map(row =>{
    val name = getS3(row.getString(0), s3client)
    (name)
  })
   res
  }).toDF("value")

View solution in original post

2 REPLIES 2

Sandesh87
New Contributor III

It can be achieved using mapPartitions

val df_response = df.mapPartitions(iterator => {
  val api_connect  = new s3clientBuild()
  val s3client = api_connect.s3connection(AccessKey, SecretKey)
  val res = iterator.map(row =>{
    val name = getS3(row.getString(0), s3client)
    (name)
  })
   res
  }).toDF("value")

jose_gonzalez
Moderator
Moderator

Hi @Sandesh Puligundla​ ,

Thank you for sharing the solution. We will mark it as "best" response so, in the future is another user has the same question, they will be able to find the solution right away.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.