10-13-2021 08:05 PM
Objective:- Retrieve objects from an S3 bucket using a 'get' api call, write the retrieved object to azure datalake and in case of errors like 404s (object not found) write the error message to cosmos DB
"my_dataframe" consists of the a column (s3ObjectName) with object names like:-
|s3ObjectName|
|a1.json|
|b2.json|
|c3.json|
|d4.json|
|e5.json|
//retry function that writes cosmos error in event of failure
def retry[T](n: Int)(fn: => T): T = {
Try {
return fn
} match {
case Success(x) => x
case Failure(t: Throwable) => {
Thread.sleep(1000)
if (n > 1) {
retry(n - 1)(fn)
} else {
val loggerDf = Seq((t.toString)).toDF("Description")
.withColumn("Type", lit("Failure"))
.withColumn("id", uuid())
loggerDf.write.format("cosmos.oltp").options(ExceptionCfg).mode("APPEND").save()
throw t
}
}
}
}
//execute s3 get api call
my_dataframe.rdd.foreachPartition(partition => {
val creds = new BasicAWSCredentials(AccessKey, SecretKey)
val clientRegion: Regions = Regions.US_EAST_1
val s3client = AmazonS3ClientBuilder.standard()
.withRegion(clientRegion)
.withCredentials(new AWSStaticCredentialsProvider(creds))
.build()
partition.foreach(x => {
retry (2) {
val objectKey = x.getString(0)
val i = s3client.getObject(s3bucket_name, objectKey).getObjectContent
val inputS3String = IOUtils.toString(i, "UTF-8")
val filePath = s"${data_lake_file_path}"
val file = new File(filePath)
val fileWriter = new FileWriter(file)
val bw = new BufferedWriter(fileWriter)
bw.write(inputS3String)
bw.close()
fileWriter.close()
}
})
})
When the above is executed it results in the following error:-
Caused by: java.lang.NullPointerException
This error occurs in the retry function when it is asked to create the dataframe loggerDf and write it to cosmos
Is there another way to write the error messages to cosmos DB ?
10-26-2021 08:57 PM
Hi @Sandesh Puligundla issue is that you are using spark context inside foreachpartition. You can create a dataframe only on the spark driver. Few stack overflow references
IIUC then a workaround would be to use "mapPartitions" and inside "map" functions call the retry functions and return the result. This will create a new dataframe with status codes. And then you can filter the failure ones and write them to cosmos db.
sample code
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val my_dataframe = spark.range(10)
import spark.implicits._
val df= my_dataframe.mapPartitions(iterator => {
/// do s3 client initialization
iterator.map(row => {
val arr = retry (2,spark){
//perform file operations
if(row%2==0){
1/0 // this will cause exception
}
else
{
row
}
}
arr
})
}).toDF("uuid","status","message")
//write this df to cosmos db
df.filter($"status"==="failure").show()
}
def retry[T](n: Int,spark: SparkSession)(fn: => T): (String, String, String) = {
Try {
fn
} match {
case Success(x) => { (java.util.UUID.randomUUID().toString,"success","") }
case Failure(t: Throwable) => {
Thread.sleep(1000)
if (n > 1) {
retry(n - 1,spark)(fn)
}
else {
(java.util.UUID.randomUUID().toString,"failure",t.getMessage)
}
}
}
}
10-15-2021 09:32 AM
Hello! My name is Piper and I'm one of the community moderators for Databricks. Thank you for your question. It looks like a lot of people will be able to learn from the answer. Thank you for your patience while we wait for a response.
10-26-2021 08:57 PM
Hi @Sandesh Puligundla issue is that you are using spark context inside foreachpartition. You can create a dataframe only on the spark driver. Few stack overflow references
IIUC then a workaround would be to use "mapPartitions" and inside "map" functions call the retry functions and return the result. This will create a new dataframe with status codes. And then you can filter the failure ones and write them to cosmos db.
sample code
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val my_dataframe = spark.range(10)
import spark.implicits._
val df= my_dataframe.mapPartitions(iterator => {
/// do s3 client initialization
iterator.map(row => {
val arr = retry (2,spark){
//perform file operations
if(row%2==0){
1/0 // this will cause exception
}
else
{
row
}
}
arr
})
}).toDF("uuid","status","message")
//write this df to cosmos db
df.filter($"status"==="failure").show()
}
def retry[T](n: Int,spark: SparkSession)(fn: => T): (String, String, String) = {
Try {
fn
} match {
case Success(x) => { (java.util.UUID.randomUUID().toString,"success","") }
case Failure(t: Throwable) => {
Thread.sleep(1000)
if (n > 1) {
retry(n - 1,spark)(fn)
}
else {
(java.util.UUID.randomUUID().toString,"failure",t.getMessage)
}
}
}
}
11-08-2021 03:55 PM
hi @Sandesh Puligundla ,
If @Gaurav Rupnar fully answered your question, would you be happy to mark his answer as best so that others can quickly find the solution?
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group