Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-26-2021 08:57 PM
Hi @Sandesh Puligundla issue is that you are using spark context inside foreachpartition. You can create a dataframe only on the spark driver. Few stack overflow references
IIUC then a workaround would be to use "mapPartitions" and inside "map" functions call the retry functions and return the result. This will create a new dataframe with status codes. And then you can filter the failure ones and write them to cosmos db.
sample code
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val my_dataframe = spark.range(10)
import spark.implicits._
val df= my_dataframe.mapPartitions(iterator => {
/// do s3 client initialization
iterator.map(row => {
val arr = retry (2,spark){
//perform file operations
if(row%2==0){
1/0 // this will cause exception
}
else
{
row
}
}
arr
})
}).toDF("uuid","status","message")
//write this df to cosmos db
df.filter($"status"==="failure").show()
}
def retry[T](n: Int,spark: SparkSession)(fn: => T): (String, String, String) = {
Try {
fn
} match {
case Success(x) => { (java.util.UUID.randomUUID().toString,"success","") }
case Failure(t: Throwable) => {
Thread.sleep(1000)
if (n > 1) {
retry(n - 1,spark)(fn)
}
else {
(java.util.UUID.randomUUID().toString,"failure",t.getMessage)
}
}
}
}