cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

org.apache.spark.SparkException: Job aborted due to stage failure when writing to Cosmos

PrashantAghara
New Contributor II

I am writing data to cosmos DB using Python & Spark on Databricks

I am getting below error :

org.apache.spark.SparkException: Job aborted due to stage failure: Authorized committer (attemptNumber=0, stage=192, partition=105) failed; but task commit success, data duplication may happen. reason=TaskKilled(preempted by scheduler,Vector(AccumulableInfo(49363,None,Some(102466),None,false,true,None), AccumulableInfo(49365,None,Some(0),None,false,true,None), AccumulableInfo(49366,None,Some(946),None,false,true,None), AccumulableInfo(49374,None,Some(22630),None,false,true,None), AccumulableInfo(49375,None,Some(6186),None,false,true,None), AccumulableInfo(49376,None,Some(4252526),None,false,true,None), AccumulableInfo(49377,None,Some(0),None,false,true,None), AccumulableInfo(49378,None,Some(1157411),None,false,true,None), AccumulableInfo(49379,None,Some(4269),None,false,true,None), AccumulableInfo(49380,None,Some(32364),None,false,true,None), AccumulableInfo(49381,None,Some(0),None,false,true,None), AccumulableInfo(49382,None,Some(0),None,false,true,None), AccumulableInfo(49383,None,Some(0),None,false,true,None), AccumulableInfo(49384,None,Some(0),None,false,true,None), AccumulableInfo(49385,None,Some(0),None,false,true,None), AccumulableInfo(49386,None,Some(0),None,false,true,None), AccumulableInfo(49387,None,Some(0),None,false,true,None), AccumulableInfo(49388,None,Some(0),None,false,true,None), AccumulableInfo(49389,None,Some(28397),None,false,true,None), AccumulableInfo(49390,None,Some(0),None,false,true,None), AccumulableInfo(49399,None,Some(3453555),None,false,true,None), AccumulableInfo(49400,None,Some(9112),None,false,true,None), AccumulableInfo(49411,None,Some(2972152),None,false,true,None), AccumulableInfo(49412,None,Some(6964402304),None,false,true,None)),Vector(LongAccumulator(id: 49363, name: Some(internal.metrics.executorRunTime), value: 102466), LongAccumulator(id: 49365, name: Some(internal.metrics.resultSize), value: 0), LongAccumulator(id: 49366, name: Some(internal.metrics.jvmGCTime), value: 946), LongAccumulator(id: 49374, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 22630), LongAccumulator(id: 49375, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 6186), LongAccumulator(id: 49376, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 4252526), LongAccumulator(id: 49377, name: Some(internal.metrics.shuffle.read.remoteBytesReadToDisk), value: 0), LongAccumulator(id: 49378, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 1157411), LongAccumulator(id: 49379, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 4269), LongAccumulator(id: 49380, name: Some(internal.metrics.shuffle.read.recordsRead), value: 32364), LongAccumulator(id: 49381, name: Some(internal.metrics.shuffle.push.read.corruptMergedBlockChunks), value: 0), LongAccumulator(id: 49382, name: Some(internal.metrics.shuffle.push.read.mergedFetchFallbackCount), value: 0), LongAccumulator(id: 49383, name: Some(internal.metrics.shuffle.push.read.remoteMergedBlocksFetched), value: 0), LongAccumulator(id: 49384, name: Some(internal.metrics.shuffle.push.read.localMergedBlocksFetched), value: 0), LongAccumulator(id: 49385, name: Some(internal.metrics.shuffle.push.read.remoteMergedChunksFetched), value: 0), LongAccumulator(id: 49386, name: Some(internal.metrics.shuffle.push.read.localMergedChunksFetched), value: 0), LongAccumulator(id: 49387, name: Some(internal.metrics.shuffle.push.read.remoteMergedBytesRead), value: 0), LongAccumulator(id: 49388, name: Some(internal.metrics.shuffle.push.read.localMergedBytesRead), value: 0), LongAccumulator(id: 49389, name: Some(internal.metrics.shuffle.read.remoteReqsDuration), value: 28397), LongAccumulator(id: 49390, name: Some(internal.metrics.shuffle.push.read.remoteMergedReqsDuration), value: 0), LongAccumulator(id: 49399, name: Some(internal.metrics.output.bytesWritten), value: 3453555), LongAccumulator(id: 49400, name: Some(internal.metrics.output.recordsWritten), value: 9112), LongMaxMinAccumulator(id: 49411, name: Some(internal.metrics.photonBufferPoolMemorySize), value: 2972152), LongAccumulator(id: 49412, name: Some(internal.metrics.photonizedTaskTimeNs), value: 6964402304)),WrappedArray(5773297400, 355825696, 0, 2617257856, 31649014, 88080432, 31649014, 2705338288, 19565642, 0, 134217728, 19565642, 0, 758730746, 0, 0, 0, 0, 0, 0, 264, 10779, 10, 2499, 13278))

Code :

result.write.format("cosmos.oltp").options(**cosmosConfigs).mode("append").option('maxBatchSize', 5000).save()
 
Can any one let me know the solution for it ?
1 REPLY 1

PrashantAghara
New Contributor II

The configs are for cluster:

Worker Type & Driver type : Standard_D16ads_v5

RUs for Cosmos : 1.5L