<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic log error to cosmos db in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/log-error-to-cosmos-db/m-p/13458#M8138</link>
    <description>&lt;P&gt;Objective:- Retrieve objects from an S3 bucket using a 'get' api call, write the retrieved object to azure datalake and in case of errors like 404s (object not found) write the error message to cosmos DB&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;"my_dataframe" consists of the a column (s3ObjectName) with object names like:-&lt;/P&gt;&lt;P&gt;|s3ObjectName|&lt;/P&gt;&lt;P&gt;|a1.json|&lt;/P&gt;&lt;P&gt;|b2.json|&lt;/P&gt;&lt;P&gt;|c3.json|&lt;/P&gt;&lt;P&gt;|d4.json|&lt;/P&gt;&lt;P&gt;|e5.json|&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;//retry function that writes cosmos error in event of failure
def retry[T](n: Int)(fn: =&amp;gt; T): T = {
  Try {
    return fn
  } match {
    case Success(x) =&amp;gt; x
    case Failure(t: Throwable) =&amp;gt; {
      Thread.sleep(1000)
      if (n &amp;gt; 1) {
        retry(n - 1)(fn)    
      } else {
        val loggerDf = Seq((t.toString)).toDF("Description")
           .withColumn("Type", lit("Failure"))
           .withColumn("id", uuid())
         loggerDf.write.format("cosmos.oltp").options(ExceptionCfg).mode("APPEND").save()
        throw t
      }
    }
  }
}
&amp;nbsp;
//execute s3 get api call
my_dataframe.rdd.foreachPartition(partition =&amp;gt; {
        val creds = new BasicAWSCredentials(AccessKey, SecretKey)
        val clientRegion: Regions = Regions.US_EAST_1
        val s3client  = AmazonS3ClientBuilder.standard()
        .withRegion(clientRegion)
        .withCredentials(new AWSStaticCredentialsProvider(creds))
        .build()
          partition.foreach(x =&amp;gt; {
            retry (2) {
            val objectKey = x.getString(0)
            val i = s3client.getObject(s3bucket_name, objectKey).getObjectContent
            val inputS3String = IOUtils.toString(i, "UTF-8")
            val filePath = s"${data_lake_file_path}"
            val file = new File(filePath)
            val fileWriter = new FileWriter(file)
            val bw = new BufferedWriter(fileWriter)
            bw.write(inputS3String)
            bw.close()
            fileWriter.close()
            }
          })
      })&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;When the above is executed it results in the following error:- &lt;/P&gt;&lt;P&gt;&lt;B&gt;Caused by: java.lang.NullPointerException&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;This error occurs in the retry function when it is asked to create the dataframe &lt;B&gt;&lt;I&gt;loggerDf &lt;/I&gt;&lt;/B&gt;and write it to cosmos &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Is there another way to write the error messages to cosmos DB ?&lt;/P&gt;</description>
    <pubDate>Thu, 14 Oct 2021 03:05:19 GMT</pubDate>
    <dc:creator>Sandesh87</dc:creator>
    <dc:date>2021-10-14T03:05:19Z</dc:date>
    <item>
      <title>log error to cosmos db</title>
      <link>https://community.databricks.com/t5/data-engineering/log-error-to-cosmos-db/m-p/13458#M8138</link>
      <description>&lt;P&gt;Objective:- Retrieve objects from an S3 bucket using a 'get' api call, write the retrieved object to azure datalake and in case of errors like 404s (object not found) write the error message to cosmos DB&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;"my_dataframe" consists of the a column (s3ObjectName) with object names like:-&lt;/P&gt;&lt;P&gt;|s3ObjectName|&lt;/P&gt;&lt;P&gt;|a1.json|&lt;/P&gt;&lt;P&gt;|b2.json|&lt;/P&gt;&lt;P&gt;|c3.json|&lt;/P&gt;&lt;P&gt;|d4.json|&lt;/P&gt;&lt;P&gt;|e5.json|&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;//retry function that writes cosmos error in event of failure
def retry[T](n: Int)(fn: =&amp;gt; T): T = {
  Try {
    return fn
  } match {
    case Success(x) =&amp;gt; x
    case Failure(t: Throwable) =&amp;gt; {
      Thread.sleep(1000)
      if (n &amp;gt; 1) {
        retry(n - 1)(fn)    
      } else {
        val loggerDf = Seq((t.toString)).toDF("Description")
           .withColumn("Type", lit("Failure"))
           .withColumn("id", uuid())
         loggerDf.write.format("cosmos.oltp").options(ExceptionCfg).mode("APPEND").save()
        throw t
      }
    }
  }
}
&amp;nbsp;
//execute s3 get api call
my_dataframe.rdd.foreachPartition(partition =&amp;gt; {
        val creds = new BasicAWSCredentials(AccessKey, SecretKey)
        val clientRegion: Regions = Regions.US_EAST_1
        val s3client  = AmazonS3ClientBuilder.standard()
        .withRegion(clientRegion)
        .withCredentials(new AWSStaticCredentialsProvider(creds))
        .build()
          partition.foreach(x =&amp;gt; {
            retry (2) {
            val objectKey = x.getString(0)
            val i = s3client.getObject(s3bucket_name, objectKey).getObjectContent
            val inputS3String = IOUtils.toString(i, "UTF-8")
            val filePath = s"${data_lake_file_path}"
            val file = new File(filePath)
            val fileWriter = new FileWriter(file)
            val bw = new BufferedWriter(fileWriter)
            bw.write(inputS3String)
            bw.close()
            fileWriter.close()
            }
          })
      })&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;When the above is executed it results in the following error:- &lt;/P&gt;&lt;P&gt;&lt;B&gt;Caused by: java.lang.NullPointerException&lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;This error occurs in the retry function when it is asked to create the dataframe &lt;B&gt;&lt;I&gt;loggerDf &lt;/I&gt;&lt;/B&gt;and write it to cosmos &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Is there another way to write the error messages to cosmos DB ?&lt;/P&gt;</description>
      <pubDate>Thu, 14 Oct 2021 03:05:19 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/log-error-to-cosmos-db/m-p/13458#M8138</guid>
      <dc:creator>Sandesh87</dc:creator>
      <dc:date>2021-10-14T03:05:19Z</dc:date>
    </item>
    <item>
      <title>Re: log error to cosmos db</title>
      <link>https://community.databricks.com/t5/data-engineering/log-error-to-cosmos-db/m-p/13459#M8139</link>
      <description>&lt;P&gt;Hello! My name is Piper and I'm one of the community moderators for Databricks. Thank you for your question. It looks like a lot of people will be able to learn from the answer. Thank you for your patience while we wait for a response.&lt;/P&gt;</description>
      <pubDate>Fri, 15 Oct 2021 16:32:08 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/log-error-to-cosmos-db/m-p/13459#M8139</guid>
      <dc:creator>Anonymous</dc:creator>
      <dc:date>2021-10-15T16:32:08Z</dc:date>
    </item>
    <item>
      <title>Re: log error to cosmos db</title>
      <link>https://community.databricks.com/t5/data-engineering/log-error-to-cosmos-db/m-p/13460#M8140</link>
      <description>&lt;P&gt;Hi @Sandesh Puligundla​&amp;nbsp; issue is that you are using spark context inside foreachpartition.  You can create a dataframe only on the spark driver.  Few stack overflow references &lt;/P&gt;&lt;P&gt;&lt;A href="https://stackoverflow.com/questions/46964250/nullpointerexception-creating-dataset-dataframe-inside-foreachpartition-foreach" target="test_blank"&gt;https://stackoverflow.com/questions/46964250/nullpointerexception-creating-dataset-dataframe-inside-foreachpartition-foreach&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;A href="https://stackoverflow.com/questions/40691086/how-to-use-sqlcontext-and-sparkcontext-inside-foreachpartition" target="test_blank"&gt;https://stackoverflow.com/questions/40691086/how-to-use-sqlcontext-and-sparkcontext-inside-foreachpartition&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;IIUC then a workaround would be to use "mapPartitions" and inside "map" functions call the retry functions and return the result. This will create a new dataframe with status codes. And then you can filter the failure ones and write them to cosmos db. &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;sample code &lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt; val spark = SparkSession.builder().master("local[*]").getOrCreate()
    val my_dataframe = spark.range(10)
    import spark.implicits._
    val df=  my_dataframe.mapPartitions(iterator =&amp;gt; {
  /// do s3 client initialization
&amp;nbsp;
      iterator.map(row =&amp;gt; {
        val arr = retry (2,spark){
          //perform file operations
          if(row%2==0){
            1/0  // this will cause exception
          }
          else
          {
            row
          }
        }
        arr
      })
    }).toDF("uuid","status","message")
    //write this df to cosmos db
    df.filter($"status"==="failure").show()
  }
  def retry[T](n: Int,spark: SparkSession)(fn: =&amp;gt; T): (String, String, String) = {
&amp;nbsp;
    Try {
       fn
    } match {
      case Success(x) =&amp;gt; { (java.util.UUID.randomUUID().toString,"success","") }
      case Failure(t: Throwable) =&amp;gt; {
        Thread.sleep(1000)
        if (n &amp;gt; 1) {
          retry(n - 1,spark)(fn)
        }
        else {
          (java.util.UUID.randomUUID().toString,"failure",t.getMessage)
        }
      }
    }
  }&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 27 Oct 2021 03:57:56 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/log-error-to-cosmos-db/m-p/13460#M8140</guid>
      <dc:creator>User16763506477</dc:creator>
      <dc:date>2021-10-27T03:57:56Z</dc:date>
    </item>
    <item>
      <title>Re: log error to cosmos db</title>
      <link>https://community.databricks.com/t5/data-engineering/log-error-to-cosmos-db/m-p/13461#M8141</link>
      <description>&lt;P&gt;hi @Sandesh Puligundla​&amp;nbsp;,&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;If @Gaurav Rupnar​&amp;nbsp;fully answered your question, would you be happy to mark his answer as best so that others can quickly find the solution?&lt;/P&gt;</description>
      <pubDate>Mon, 08 Nov 2021 23:55:59 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/log-error-to-cosmos-db/m-p/13461#M8141</guid>
      <dc:creator>jose_gonzalez</dc:creator>
      <dc:date>2021-11-08T23:55:59Z</dc:date>
    </item>
  </channel>
</rss>

