cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

create a dataframe with all the responses from the api requests within foreachPartition

Sandesh87
New Contributor III

I am trying to execute an api call to get an object(json) from amazon s3 and I am using foreachPartition to execute multiple calls in parallel

df.rdd.foreachPartition(partition => {
  //Initialize list buffer
  var buffer_accounts1 = new ListBuffer[String]()
 
  //Initialize Connection to amazon s3
  val s3 = s3clientConnection()
 
  partition.foreach(fun=>{
   //api to get object from s3 bucket
   //the first column of each row contains s3 object name
    val obj = getS3Object(s3, "my_bucket", fun.getString(0)).getContent
    val objString = IOUtils.toString(obj, "UTF-8")
    buffer_accounts1 += objString 
  })
  buffer_accounts1.toList.toDF("Object").write.parquet("dbfs:/mnt/test")
 })

From the foreachPartition I would like to store the string responses from all of the api calls into a single dataframe. So if in my forEachPartition if I make a total of 100 api calls I would like to create one dataframe that has all the 100 responses.

To do this I am creating a mutable list and want to convert it to a dataframe within foreachPartition  but we cannot create a dataframe outside of the driver.

I want to create a dataframe with all the responses from the total api calls within the foreachPartition so that I can apply further transformations. How can this be achieved?

Note:- I could write every response to disk as json and read them back in but that results in performance degradation because of a lot of disk I/O operations.

1 ACCEPTED SOLUTION

Accepted Solutions

Sandesh87
New Contributor III

It can be achieved using mapPartitions

val df_response = df.mapPartitions(iterator => {
  val api_connect  = new s3clientBuild()
  val s3client = api_connect.s3connection(AccessKey, SecretKey)
  val res = iterator.map(row =>{
    val name = getS3(row.getString(0), s3client)
    (name)
  })
   res
  }).toDF("value")

View solution in original post

2 REPLIES 2

Sandesh87
New Contributor III

It can be achieved using mapPartitions

val df_response = df.mapPartitions(iterator => {
  val api_connect  = new s3clientBuild()
  val s3client = api_connect.s3connection(AccessKey, SecretKey)
  val res = iterator.map(row =>{
    val name = getS3(row.getString(0), s3client)
    (name)
  })
   res
  }).toDF("value")

jose_gonzalez
Moderator
Moderator

Hi @Sandesh Puligundla​ ,

Thank you for sharing the solution. We will mark it as "best" response so, in the future is another user has the same question, they will be able to find the solution right away.

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!