cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

create a dataframe with all the responses from the api requests within foreachPartition

Sandesh87
New Contributor III

I am trying to execute an api call to get an object(json) from amazon s3 and I am using foreachPartition to execute multiple calls in parallel

df.rdd.foreachPartition(partition => {
  //Initialize list buffer
  var buffer_accounts1 = new ListBuffer[String]()
 
  //Initialize Connection to amazon s3
  val s3 = s3clientConnection()
 
  partition.foreach(fun=>{
   //api to get object from s3 bucket
   //the first column of each row contains s3 object name
    val obj = getS3Object(s3, "my_bucket", fun.getString(0)).getContent
    val objString = IOUtils.toString(obj, "UTF-8")
    buffer_accounts1 += objString 
  })
  buffer_accounts1.toList.toDF("Object").write.parquet("dbfs:/mnt/test")
 })

From the foreachPartition I would like to store the string responses from all of the api calls into a single dataframe. So if in my forEachPartition if I make a total of 100 api calls I would like to create one dataframe that has all the 100 responses.

To do this I am creating a mutable list and want to convert it to a dataframe within foreachPartition  but we cannot create a dataframe outside of the driver.

I want to create a dataframe with all the responses from the total api calls within the foreachPartition so that I can apply further transformations. How can this be achieved?

Note:- I could write every response to disk as json and read them back in but that results in performance degradation because of a lot of disk I/O operations.

1 ACCEPTED SOLUTION

Accepted Solutions

Sandesh87
New Contributor III

It can be achieved using mapPartitions

val df_response = df.mapPartitions(iterator => {
  val api_connect  = new s3clientBuild()
  val s3client = api_connect.s3connection(AccessKey, SecretKey)
  val res = iterator.map(row =>{
    val name = getS3(row.getString(0), s3client)
    (name)
  })
   res
  }).toDF("value")

View solution in original post

2 REPLIES 2

Sandesh87
New Contributor III

It can be achieved using mapPartitions

val df_response = df.mapPartitions(iterator => {
  val api_connect  = new s3clientBuild()
  val s3client = api_connect.s3connection(AccessKey, SecretKey)
  val res = iterator.map(row =>{
    val name = getS3(row.getString(0), s3client)
    (name)
  })
   res
  }).toDF("value")

jose_gonzalez
Moderator
Moderator

Hi @Sandesh Puligundlaโ€‹ ,

Thank you for sharing the solution. We will mark it as "best" response so, in the future is another user has the same question, they will be able to find the solution right away.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group