cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Making HTTP post requests on Spark using foreachPartition

CaioIshizaka_Co
New Contributor

Need some help to understand the behaviour of the below in Spark (using Scala and Databricks)

I have some dataframe (reading from S3 if that matters), and would send that data by making HTTP post requests in batches of 1000 (at most). So I repartitioned the dataframe to make sure each partition has no more than 1000 records. Also, created a json column for each line (so I need only to put them in an array later on)

The trouble is on the making the requests. I created the following a Serializable class using the following code

import org.apache.spark.sql.{DataFrame, Row}
import org.apache.http.client.methods.HttpPost
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.http.HttpHeaders
import org.apache.http.entity.StringEntity
import org.apache.commons.io.IOUtils
object postObject extends Serializable{
  val client = HttpClientBuilder.create().build()
  val post = new HttpPost("https://my-cool-api-endpoint")
  post.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")
  def makeHttpCall(row: Iterator[Row]) = {
      val json_str = """{"people": [""" + row.toSeq.map(x => x.getAs[String]("json")).mkString(",") + "]}"      
      post.setEntity(new StringEntity(json_str))
      val response = client.execute(post)
      val entity = response.getEntity()
      println(Seq(response.getStatusLine.getStatusCode(), response.getStatusLine.getReasonPhrase()))
      println(IOUtils.toString(entity.getContent()))
  }
}
</p><p>
    Now when I try the following:</p><pre>postObject.makeHttpCall(data.head(2).toIterator)

It works like a charm. The requests go through, there is some output on the screen, and my API gets that data.

But when I try to put it in the foreachPartition:

data.foreachPartition { x => 
  postObject.makeHttpCall(x)
}

Nothing happens. No output on screen, nothing arrives in my API. If I try to rerun it, almost all stages just skips. I believe, for any reason, it is just lazy evaluating my requests, but not actually performing it. I don't understand why, and how to force it.

1 REPLY 1

melo08
New Contributor II

Need some help to understand the behaviour of the below in Spark (using Scala and Databricks)

I have some dataframe (reading from S3 if that matters), and would send that data by making HTTP post requests in batches of 1000 (at most). So I repartitioned the dataframe to make sure each partition has no more than 1000 records. Also, created a json column for each line (so I need only to put them in an array later on)

The trouble is on the making the requests. I created the following a Serializable class using the following code

import org.apache.spark.sql.{DataFrame,Row}import org.apache.http.client.methods.HttpPostimport org.apache.http.impl.client.HttpClientBuilderimport org.apache.http.HttpHeadersimport org.apache.http.entity.StringEntityimport org.apache.commons.io.IOUtilsobject postObject extendsSerializable{  val client =HttpClientBuilder.create().build()  val post =newHttpPost("https://my-cool-api-<a href="https://applinked.me/firestick-tv/" target="_blank">applinked on fire stick</a>")  post.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")def makeHttpCall(row:Iterator[Row])={      val json_str ="""{"people": ["""+ row.toSeq.map(x => x.getAs[String]("json")).mkString(",")+"]}"      post.setEntity(newStringEntity(json_str))      val response = client.execute(post)      val entity = response.getEntity()      println(Seq(response.getStatusLine.getStatusCode(), response.getStatusLine.getReasonPhrase()))      println(IOUtils.toString(entity.getContent()))}}</p><p>    Now when I try the following:</p><pre>postObject.makeHttpCall(data.head(2).toIterator)

It works like a charm. The requests go through, there is some output on the screen, and my API gets that data.

But when I try to put it in the foreachPartition:

data.foreachPartition { x =>  postObject.makeHttpCall(x)}

Nothing happens. No output on screen, nothing arrives in my API. If I try to rerun it, almost all stages just skips. I believe, for any reason, it is just lazy evaluating my requests, but not actually performing it. I don't understand why, and how to force it.

Following for answers because I have similar doubt

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.