cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Making HTTP post requests on Spark using foreachPartition

CaioIshizaka_Co
New Contributor

Need some help to understand the behaviour of the below in Spark (using Scala and Databricks)

I have some dataframe (reading from S3 if that matters), and would send that data by making HTTP post requests in batches of 1000 (at most). So I repartitioned the dataframe to make sure each partition has no more than 1000 records. Also, created a json column for each line (so I need only to put them in an array later on)

The trouble is on the making the requests. I created the following a Serializable class using the following code

import org.apache.spark.sql.{DataFrame, Row}
import org.apache.http.client.methods.HttpPost
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.http.HttpHeaders
import org.apache.http.entity.StringEntity
import org.apache.commons.io.IOUtils
object postObject extends Serializable{
  val client = HttpClientBuilder.create().build()
  val post = new HttpPost("https://my-cool-api-endpoint")
  post.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")
  def makeHttpCall(row: Iterator[Row]) = {
      val json_str = """{"people": [""" + row.toSeq.map(x => x.getAs[String]("json")).mkString(",") + "]}"      
      post.setEntity(new StringEntity(json_str))
      val response = client.execute(post)
      val entity = response.getEntity()
      println(Seq(response.getStatusLine.getStatusCode(), response.getStatusLine.getReasonPhrase()))
      println(IOUtils.toString(entity.getContent()))
  }
}
</p><p>
    Now when I try the following:</p><pre>postObject.makeHttpCall(data.head(2).toIterator)

It works like a charm. The requests go through, there is some output on the screen, and my API gets that data.

But when I try to put it in the foreachPartition:

data.foreachPartition { x => 
  postObject.makeHttpCall(x)
}

Nothing happens. No output on screen, nothing arrives in my API. If I try to rerun it, almost all stages just skips. I believe, for any reason, it is just lazy evaluating my requests, but not actually performing it. I don't understand why, and how to force it.

1 REPLY 1

melo08
New Contributor II

Need some help to understand the behaviour of the below in Spark (using Scala and Databricks)

I have some dataframe (reading from S3 if that matters), and would send that data by making HTTP post requests in batches of 1000 (at most). So I repartitioned the dataframe to make sure each partition has no more than 1000 records. Also, created a json column for each line (so I need only to put them in an array later on)

The trouble is on the making the requests. I created the following a Serializable class using the following code

import org.apache.spark.sql.{DataFrame,Row}import org.apache.http.client.methods.HttpPostimport org.apache.http.impl.client.HttpClientBuilderimport org.apache.http.HttpHeadersimport org.apache.http.entity.StringEntityimport org.apache.commons.io.IOUtilsobject postObject extendsSerializable{  val client =HttpClientBuilder.create().build()  val post =newHttpPost("https://my-cool-api-<a href="https://applinked.me/firestick-tv/" target="_blank">applinked on fire stick</a>")  post.addHeader(HttpHeaders.CONTENT_TYPE,"application/json")def makeHttpCall(row:Iterator[Row])={      val json_str ="""{"people": ["""+ row.toSeq.map(x => x.getAs[String]("json")).mkString(",")+"]}"      post.setEntity(newStringEntity(json_str))      val response = client.execute(post)      val entity = response.getEntity()      println(Seq(response.getStatusLine.getStatusCode(), response.getStatusLine.getReasonPhrase()))      println(IOUtils.toString(entity.getContent()))}}</p><p>    Now when I try the following:</p><pre>postObject.makeHttpCall(data.head(2).toIterator)

It works like a charm. The requests go through, there is some output on the screen, and my API gets that data.

But when I try to put it in the foreachPartition:

data.foreachPartition { x =>  postObject.makeHttpCall(x)}

Nothing happens. No output on screen, nothing arrives in my API. If I try to rerun it, almost all stages just skips. I believe, for any reason, it is just lazy evaluating my requests, but not actually performing it. I don't understand why, and how to force it.

Following for answers because I have similar doubt

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now