cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

log error to cosmos db

Sandesh87
New Contributor III

Objective:- Retrieve objects from an S3 bucket using a 'get' api call, write the retrieved object to azure datalake and in case of errors like 404s (object not found) write the error message to cosmos DB

"my_dataframe" consists of the a column (s3ObjectName) with object names like:-

|s3ObjectName|

|a1.json|

|b2.json|

|c3.json|

|d4.json|

|e5.json|

//retry function that writes cosmos error in event of failure
def retry[T](n: Int)(fn: => T): T = {
  Try {
    return fn
  } match {
    case Success(x) => x
    case Failure(t: Throwable) => {
      Thread.sleep(1000)
      if (n > 1) {
        retry(n - 1)(fn)    
      } else {
        val loggerDf = Seq((t.toString)).toDF("Description")
           .withColumn("Type", lit("Failure"))
           .withColumn("id", uuid())
         loggerDf.write.format("cosmos.oltp").options(ExceptionCfg).mode("APPEND").save()
        throw t
      }
    }
  }
}
 
//execute s3 get api call
my_dataframe.rdd.foreachPartition(partition => {
        val creds = new BasicAWSCredentials(AccessKey, SecretKey)
        val clientRegion: Regions = Regions.US_EAST_1
        val s3client  = AmazonS3ClientBuilder.standard()
        .withRegion(clientRegion)
        .withCredentials(new AWSStaticCredentialsProvider(creds))
        .build()
          partition.foreach(x => {
            retry (2) {
            val objectKey = x.getString(0)
            val i = s3client.getObject(s3bucket_name, objectKey).getObjectContent
            val inputS3String = IOUtils.toString(i, "UTF-8")
            val filePath = s"${data_lake_file_path}"
            val file = new File(filePath)
            val fileWriter = new FileWriter(file)
            val bw = new BufferedWriter(fileWriter)
            bw.write(inputS3String)
            bw.close()
            fileWriter.close()
            }
          })
      })

When the above is executed it results in the following error:-

Caused by: java.lang.NullPointerException

This error occurs in the retry function when it is asked to create the dataframe loggerDf and write it to cosmos

Is there another way to write the error messages to cosmos DB ?

1 ACCEPTED SOLUTION

Accepted Solutions

User16763506477
Contributor III

Hi @Sandesh Puligundlaโ€‹  issue is that you are using spark context inside foreachpartition. You can create a dataframe only on the spark driver. Few stack overflow references

https://stackoverflow.com/questions/46964250/nullpointerexception-creating-dataset-dataframe-inside-...

https://stackoverflow.com/questions/40691086/how-to-use-sqlcontext-and-sparkcontext-inside-foreachpa...

IIUC then a workaround would be to use "mapPartitions" and inside "map" functions call the retry functions and return the result. This will create a new dataframe with status codes. And then you can filter the failure ones and write them to cosmos db.

sample code

 val spark = SparkSession.builder().master("local[*]").getOrCreate()
    val my_dataframe = spark.range(10)
    import spark.implicits._
    val df=  my_dataframe.mapPartitions(iterator => {
  /// do s3 client initialization
 
      iterator.map(row => {
        val arr = retry (2,spark){
          //perform file operations
          if(row%2==0){
            1/0  // this will cause exception
          }
          else
          {
            row
          }
        }
        arr
      })
    }).toDF("uuid","status","message")
    //write this df to cosmos db
    df.filter($"status"==="failure").show()
  }
  def retry[T](n: Int,spark: SparkSession)(fn: => T): (String, String, String) = {
 
    Try {
       fn
    } match {
      case Success(x) => { (java.util.UUID.randomUUID().toString,"success","") }
      case Failure(t: Throwable) => {
        Thread.sleep(1000)
        if (n > 1) {
          retry(n - 1,spark)(fn)
        }
        else {
          (java.util.UUID.randomUUID().toString,"failure",t.getMessage)
        }
      }
    }
  }

View solution in original post

3 REPLIES 3

Anonymous
Not applicable

Hello! My name is Piper and I'm one of the community moderators for Databricks. Thank you for your question. It looks like a lot of people will be able to learn from the answer. Thank you for your patience while we wait for a response.

User16763506477
Contributor III

Hi @Sandesh Puligundlaโ€‹  issue is that you are using spark context inside foreachpartition. You can create a dataframe only on the spark driver. Few stack overflow references

https://stackoverflow.com/questions/46964250/nullpointerexception-creating-dataset-dataframe-inside-...

https://stackoverflow.com/questions/40691086/how-to-use-sqlcontext-and-sparkcontext-inside-foreachpa...

IIUC then a workaround would be to use "mapPartitions" and inside "map" functions call the retry functions and return the result. This will create a new dataframe with status codes. And then you can filter the failure ones and write them to cosmos db.

sample code

 val spark = SparkSession.builder().master("local[*]").getOrCreate()
    val my_dataframe = spark.range(10)
    import spark.implicits._
    val df=  my_dataframe.mapPartitions(iterator => {
  /// do s3 client initialization
 
      iterator.map(row => {
        val arr = retry (2,spark){
          //perform file operations
          if(row%2==0){
            1/0  // this will cause exception
          }
          else
          {
            row
          }
        }
        arr
      })
    }).toDF("uuid","status","message")
    //write this df to cosmos db
    df.filter($"status"==="failure").show()
  }
  def retry[T](n: Int,spark: SparkSession)(fn: => T): (String, String, String) = {
 
    Try {
       fn
    } match {
      case Success(x) => { (java.util.UUID.randomUUID().toString,"success","") }
      case Failure(t: Throwable) => {
        Thread.sleep(1000)
        if (n > 1) {
          retry(n - 1,spark)(fn)
        }
        else {
          (java.util.UUID.randomUUID().toString,"failure",t.getMessage)
        }
      }
    }
  }

hi @Sandesh Puligundlaโ€‹ ,

If @Gaurav Rupnarโ€‹ fully answered your question, would you be happy to mark his answer as best so that others can quickly find the solution?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group