cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

log error to cosmos db

Sandesh87
New Contributor III

Objective:- Retrieve objects from an S3 bucket using a 'get' api call, write the retrieved object to azure datalake and in case of errors like 404s (object not found) write the error message to cosmos DB

"my_dataframe" consists of the a column (s3ObjectName) with object names like:-

|s3ObjectName|

|a1.json|

|b2.json|

|c3.json|

|d4.json|

|e5.json|

//retry function that writes cosmos error in event of failure
def retry[T](n: Int)(fn: => T): T = {
  Try {
    return fn
  } match {
    case Success(x) => x
    case Failure(t: Throwable) => {
      Thread.sleep(1000)
      if (n > 1) {
        retry(n - 1)(fn)    
      } else {
        val loggerDf = Seq((t.toString)).toDF("Description")
           .withColumn("Type", lit("Failure"))
           .withColumn("id", uuid())
         loggerDf.write.format("cosmos.oltp").options(ExceptionCfg).mode("APPEND").save()
        throw t
      }
    }
  }
}
 
//execute s3 get api call
my_dataframe.rdd.foreachPartition(partition => {
        val creds = new BasicAWSCredentials(AccessKey, SecretKey)
        val clientRegion: Regions = Regions.US_EAST_1
        val s3client  = AmazonS3ClientBuilder.standard()
        .withRegion(clientRegion)
        .withCredentials(new AWSStaticCredentialsProvider(creds))
        .build()
          partition.foreach(x => {
            retry (2) {
            val objectKey = x.getString(0)
            val i = s3client.getObject(s3bucket_name, objectKey).getObjectContent
            val inputS3String = IOUtils.toString(i, "UTF-8")
            val filePath = s"${data_lake_file_path}"
            val file = new File(filePath)
            val fileWriter = new FileWriter(file)
            val bw = new BufferedWriter(fileWriter)
            bw.write(inputS3String)
            bw.close()
            fileWriter.close()
            }
          })
      })

When the above is executed it results in the following error:-

Caused by: java.lang.NullPointerException

This error occurs in the retry function when it is asked to create the dataframe loggerDf and write it to cosmos

Is there another way to write the error messages to cosmos DB ?

1 ACCEPTED SOLUTION

Accepted Solutions

User16763506477
Contributor III

Hi @Sandesh Puligundla​  issue is that you are using spark context inside foreachpartition. You can create a dataframe only on the spark driver. Few stack overflow references

https://stackoverflow.com/questions/46964250/nullpointerexception-creating-dataset-dataframe-inside-...

https://stackoverflow.com/questions/40691086/how-to-use-sqlcontext-and-sparkcontext-inside-foreachpa...

IIUC then a workaround would be to use "mapPartitions" and inside "map" functions call the retry functions and return the result. This will create a new dataframe with status codes. And then you can filter the failure ones and write them to cosmos db.

sample code

 val spark = SparkSession.builder().master("local[*]").getOrCreate()
    val my_dataframe = spark.range(10)
    import spark.implicits._
    val df=  my_dataframe.mapPartitions(iterator => {
  /// do s3 client initialization
 
      iterator.map(row => {
        val arr = retry (2,spark){
          //perform file operations
          if(row%2==0){
            1/0  // this will cause exception
          }
          else
          {
            row
          }
        }
        arr
      })
    }).toDF("uuid","status","message")
    //write this df to cosmos db
    df.filter($"status"==="failure").show()
  }
  def retry[T](n: Int,spark: SparkSession)(fn: => T): (String, String, String) = {
 
    Try {
       fn
    } match {
      case Success(x) => { (java.util.UUID.randomUUID().toString,"success","") }
      case Failure(t: Throwable) => {
        Thread.sleep(1000)
        if (n > 1) {
          retry(n - 1,spark)(fn)
        }
        else {
          (java.util.UUID.randomUUID().toString,"failure",t.getMessage)
        }
      }
    }
  }

View solution in original post

3 REPLIES 3

Anonymous
Not applicable

Hello! My name is Piper and I'm one of the community moderators for Databricks. Thank you for your question. It looks like a lot of people will be able to learn from the answer. Thank you for your patience while we wait for a response.

User16763506477
Contributor III

Hi @Sandesh Puligundla​  issue is that you are using spark context inside foreachpartition. You can create a dataframe only on the spark driver. Few stack overflow references

https://stackoverflow.com/questions/46964250/nullpointerexception-creating-dataset-dataframe-inside-...

https://stackoverflow.com/questions/40691086/how-to-use-sqlcontext-and-sparkcontext-inside-foreachpa...

IIUC then a workaround would be to use "mapPartitions" and inside "map" functions call the retry functions and return the result. This will create a new dataframe with status codes. And then you can filter the failure ones and write them to cosmos db.

sample code

 val spark = SparkSession.builder().master("local[*]").getOrCreate()
    val my_dataframe = spark.range(10)
    import spark.implicits._
    val df=  my_dataframe.mapPartitions(iterator => {
  /// do s3 client initialization
 
      iterator.map(row => {
        val arr = retry (2,spark){
          //perform file operations
          if(row%2==0){
            1/0  // this will cause exception
          }
          else
          {
            row
          }
        }
        arr
      })
    }).toDF("uuid","status","message")
    //write this df to cosmos db
    df.filter($"status"==="failure").show()
  }
  def retry[T](n: Int,spark: SparkSession)(fn: => T): (String, String, String) = {
 
    Try {
       fn
    } match {
      case Success(x) => { (java.util.UUID.randomUUID().toString,"success","") }
      case Failure(t: Throwable) => {
        Thread.sleep(1000)
        if (n > 1) {
          retry(n - 1,spark)(fn)
        }
        else {
          (java.util.UUID.randomUUID().toString,"failure",t.getMessage)
        }
      }
    }
  }

hi @Sandesh Puligundla​ ,

If @Gaurav Rupnar​ fully answered your question, would you be happy to mark his answer as best so that others can quickly find the solution?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.