Is it normal for KinesisSource to generate empty microbatches when there is no new data in Kinesis?
Batch 1 finished as there were records in kinesis and BatchId 2 started. BatchId 2 was running but then BatchId 3 started .
Even though there was no more data, why did BatchId 3 trigger ?
{
"id" : "b7ce55f9-0325-45ce-9454-5e594a6deb36",
"runId" : "05f969fb-18bd-4891-8571-0f4f0eb34a36",
"name" : "display_query_3",
"timestamp" : "2023-01-01T06:14:21.000Z",
"batchId" : 3,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 0,
"triggerExecution" : 0
},
"stateOperators" : [ {
"operatorName" : "globalLimit",
"numRowsTotal" : 1,
"numRowsUpdated" : 0,
"allUpdatesTimeMs" : 5,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 56,
"memoryUsedBytes" : 640,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 1,
"numStateStoreInstances" : 1,
"customMetrics" : {
"loadedMapCacheHitCount" : 2,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 280
}
} ],
"sources" : [ {
"description" : "KinesisV2[stream11]",
"startOffset" : [ {
"shard" : {
"stream" : "stream1",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49638523104722792531473200236855507911023412145186406402",
"lastSeqNum" : "49638523104722792531473200236855507911023412145186406402",
"closed" : false,
"msBehindLatest" : "0"
}, {
"shard" : {
"stream" : "stream1",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49638523104745093276671730859999461480935289833761275922",
"lastSeqNum" : "49638523104745093276671730859999461480935289833761275922",
"closed" : false,
"msBehindLatest" : "0"
}, {
"shard" : {
"stream" : "stream1",
"shardId" : "shardId-000000000002"
},
"firstSeqNum" : "49638523104767394021870261483142206125027552824441962530",
"lastSeqNum" : "49638523104767394021870261483142206125027552824441962530",
"closed" : false,
"msBehindLatest" : "0"
}, {
"shard" : {
"stream" : "stream1",
"shardId" : "shardId-000000000003"
},
"firstSeqNum" : null,
"lastSeqNum" : null,
"closed" : false,
"msBehindLatest" : "0"
} ],
"endOffset" : [ {
"shard" : {
"stream" : "stream1",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49638523104722792531473200236855507911023412145186406402",
"lastSeqNum" : "49638523104722792531473200236855507911023412145186406402",
"closed" : false,
"msBehindLatest" : "0"
}, {
"shard" : {
"stream" : "stream1",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49638523104745093276671730859999461480935289833761275922",
"lastSeqNum" : "49638523104745093276671730859999461480935289833761275922",
"closed" : false,
"msBehindLatest" : "0"
}, {
"shard" : {
"stream" : "stream1",
"shardId" : "shardId-000000000002"
},
"firstSeqNum" : "49638523104767394021870261483142206125027552824441962530",
"lastSeqNum" : "49638523104767394021870261483142206125027552824441962530",
"closed" : false,
"msBehindLatest" : "0"
}, {
"shard" : {
"stream" : "stream1",
"shardId" : "shardId-000000000003"
},
"firstSeqNum" : null,
"lastSeqNum" : null,
"closed" : false,
"msBehindLatest" : "0"
} ],
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "polling",
"numClosedShards" : "0",
"numProcessedBytes" : "0",
"numProcessedRecords" : "0",
"numStreams" : "1",
"numTotalShards" : "4",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "MemorySink",
"numOutputRows" : 0
}
}
kinesisDF = spark \
.readStream \
.format("kinesis") \
.option("streamName", kinesis_stream_name) \
.option("region", region_name) \
.option("initialPosition", initial_position) \
.option("awsAccessKey", aws_access_key) \
.option("awsSecretKey", aws_secret_access_key) \
.option("maxFetchDuration", "60s") \
.option("minFetchPeriod","60s") \
.load() \
Can anyone help me in understanding why empty batches are triggering with kinesis source?