Hi team,
Kinesis -> delta table raw -> job with trigger=availableNow -> delta table target.
The Kinesis->delta table raw is running continuously. The job is daily with trigger=availableNow.
The job reads from raw, do some transformation, and run a MERGE function in foreachUpdate.
When the job starts to run, it spends quite a long time on addBatch and lastestOffset. Like
[queryId = 3e0d2] [batchId = 290] Streaming query made progress: {
"id" : "3e0...92",
"runId" : "8c...6c",
"name" : null,
"timestamp" : "2024-10-15T18:45:28.123Z",
"batchId" : 290,
"batchDuration" : 1779275,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"addBatch" : 61397,
"commitOffsets" : 95,
"getBatch" : 548418,
"latestOffset" : 1168003,
"queryPlanning" : 803,
"triggerExecution" : 1779263,
"walCommit" : 367
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[s3://..]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "...",
"reservoirVersion" : 3207727,
"index" : -1,
"isStartingVersion" : false
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "...",
"reservoirVersion" : 3223576,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "13445289",
"numFilesOutstanding" : "217"
}
} ],
"sink" : {
"description" : "ForeachBatchSink",
"numOutputRows" : -1
}
}
The MERGE seems very fast after getting data from raw. From the spark UI, there are a big gap there (around batchDuration time) seems waiting on the batches. Is the trigger supposed to get the version from checkpoint offset and read from all versions afterwards or something else? Why it is so slow?
Thanks