User16763506477
Databricks Employee
Databricks Employee

Hi @Sandesh Puligundla​  issue is that you are using spark context inside foreachpartition. You can create a dataframe only on the spark driver. Few stack overflow references

https://stackoverflow.com/questions/46964250/nullpointerexception-creating-dataset-dataframe-inside-...

https://stackoverflow.com/questions/40691086/how-to-use-sqlcontext-and-sparkcontext-inside-foreachpa...

IIUC then a workaround would be to use "mapPartitions" and inside "map" functions call the retry functions and return the result. This will create a new dataframe with status codes. And then you can filter the failure ones and write them to cosmos db.

sample code

 val spark = SparkSession.builder().master("local[*]").getOrCreate()
    val my_dataframe = spark.range(10)
    import spark.implicits._
    val df=  my_dataframe.mapPartitions(iterator => {
  /// do s3 client initialization
 
      iterator.map(row => {
        val arr = retry (2,spark){
          //perform file operations
          if(row%2==0){
            1/0  // this will cause exception
          }
          else
          {
            row
          }
        }
        arr
      })
    }).toDF("uuid","status","message")
    //write this df to cosmos db
    df.filter($"status"==="failure").show()
  }
  def retry[T](n: Int,spark: SparkSession)(fn: => T): (String, String, String) = {
 
    Try {
       fn
    } match {
      case Success(x) => { (java.util.UUID.randomUUID().toString,"success","") }
      case Failure(t: Throwable) => {
        Thread.sleep(1000)
        if (n > 1) {
          retry(n - 1,spark)(fn)
        }
        else {
          (java.util.UUID.randomUUID().toString,"failure",t.getMessage)
        }
      }
    }
  }

View solution in original post