cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

KinesisSource generates empty microbatches when there is no new data.

pranathisg97
New Contributor III

Is it normal for KinesisSource to generate empty microbatches when there is no new data in Kinesis?

Batch 1 finished as there were records in kinesis and BatchId 2 started. BatchId 2 was running but then BatchId 3 started .

Even though there was no more data, why did BatchId 3 trigger ?

{
  "id" : "b7ce55f9-0325-45ce-9454-5e594a6deb36",
  "runId" : "05f969fb-18bd-4891-8571-0f4f0eb34a36",
  "name" : "display_query_3",
  "timestamp" : "2023-01-01T06:14:21.000Z",
  "batchId" : 3,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 0,
    "triggerExecution" : 0
  },
  "stateOperators" : [ {
    "operatorName" : "globalLimit",
    "numRowsTotal" : 1,
    "numRowsUpdated" : 0,
    "allUpdatesTimeMs" : 5,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 56,
    "memoryUsedBytes" : 640,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 1,
    "numStateStoreInstances" : 1,
    "customMetrics" : {
      "loadedMapCacheHitCount" : 2,
      "loadedMapCacheMissCount" : 0,
      "stateOnCurrentVersionSizeBytes" : 280
    }
  } ],
  "sources" : [ {
    "description" : "KinesisV2[stream11]",
    "startOffset" : [ {
      "shard" : {
        "stream" : "stream1",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49638523104722792531473200236855507911023412145186406402",
      "lastSeqNum" : "49638523104722792531473200236855507911023412145186406402",
      "closed" : false,
      "msBehindLatest" : "0"
    }, {
      "shard" : {
        "stream" : "stream1",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49638523104745093276671730859999461480935289833761275922",
      "lastSeqNum" : "49638523104745093276671730859999461480935289833761275922",
      "closed" : false,
      "msBehindLatest" : "0"
    }, {
      "shard" : {
        "stream" : "stream1",
        "shardId" : "shardId-000000000002"
      },
      "firstSeqNum" : "49638523104767394021870261483142206125027552824441962530",
      "lastSeqNum" : "49638523104767394021870261483142206125027552824441962530",
      "closed" : false,
      "msBehindLatest" : "0"
    }, {
      "shard" : {
        "stream" : "stream1",
        "shardId" : "shardId-000000000003"
      },
      "firstSeqNum" : null,
      "lastSeqNum" : null,
      "closed" : false,
      "msBehindLatest" : "0"
    } ],
    "endOffset" : [ {
      "shard" : {
        "stream" : "stream1",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49638523104722792531473200236855507911023412145186406402",
      "lastSeqNum" : "49638523104722792531473200236855507911023412145186406402",
      "closed" : false,
      "msBehindLatest" : "0"
    }, {
      "shard" : {
        "stream" : "stream1",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49638523104745093276671730859999461480935289833761275922",
      "lastSeqNum" : "49638523104745093276671730859999461480935289833761275922",
      "closed" : false,
      "msBehindLatest" : "0"
    }, {
      "shard" : {
        "stream" : "stream1",
        "shardId" : "shardId-000000000002"
      },
      "firstSeqNum" : "49638523104767394021870261483142206125027552824441962530",
      "lastSeqNum" : "49638523104767394021870261483142206125027552824441962530",
      "closed" : false,
      "msBehindLatest" : "0"
    }, {
      "shard" : {
        "stream" : "stream1",
        "shardId" : "shardId-000000000003"
      },
      "firstSeqNum" : null,
      "lastSeqNum" : null,
      "closed" : false,
      "msBehindLatest" : "0"
    } ],
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "polling",
      "numClosedShards" : "0",
      "numProcessedBytes" : "0",
      "numProcessedRecords" : "0",
      "numStreams" : "1",
      "numTotalShards" : "4",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "MemorySink",
    "numOutputRows" : 0
  }
}
kinesisDF = spark \
  .readStream \
  .format("kinesis") \
  .option("streamName", kinesis_stream_name) \
  .option("region", region_name) \
  .option("initialPosition", initial_position) \
  .option("awsAccessKey", aws_access_key) \
  .option("awsSecretKey", aws_secret_access_key) \
  .option("maxFetchDuration", "60s") \
  .option("minFetchPeriod","60s") \
  .load() \

Can anyone help me in understanding why empty batches are triggering with kinesis source?

2 REPLIES 2

Anonymous
Not applicable

Yes, it is normal for a KinesisSource to generate empty microbatches when there is no new data in the Kinesis stream. This behavior is a consequence of how the KinesisSource is designed to work.

The KinesisSource continuously polls the Kinesis stream for new data in order to create microbatches to be processed by the Spark Streaming job. If there is no new data available in the stream, the KinesisSource will still create microbatches with no data in them. These empty microbatches are then processed by the Spark Streaming job, which will typically result in no output being generated.

About your question, it is possible that BatchId 3 was triggered because the KinesisSource continued to poll the Kinesis stream after BatchId 2 was created and processed, and it found new data that arrived after BatchId 2 had finished processing. This would result in the KinesisSource creating a new microbatch (BatchId 3) with the new data that had arrived, even though there was no new data available when BatchId 2 was created.

Anonymous
Not applicable

Hi @Pranathi Girish​ 

Thank you for posting your question in our community! We are happy to assist you.

To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?

This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance! 

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.