cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

KinesisSource generates empty microbatches when there is no new data.

pranathisg97
New Contributor III

Is it normal for KinesisSource to generate empty microbatches when there is no new data in Kinesis?

Batch 1 finished as there were records in kinesis and BatchId 2 started. BatchId 2 was running but then BatchId 3 started .

Even though there was no more data, why did BatchId 3 trigger ?

{
  "id" : "b7ce55f9-0325-45ce-9454-5e594a6deb36",
  "runId" : "05f969fb-18bd-4891-8571-0f4f0eb34a36",
  "name" : "display_query_3",
  "timestamp" : "2023-01-01T06:14:21.000Z",
  "batchId" : 3,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 0,
    "triggerExecution" : 0
  },
  "stateOperators" : [ {
    "operatorName" : "globalLimit",
    "numRowsTotal" : 1,
    "numRowsUpdated" : 0,
    "allUpdatesTimeMs" : 5,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 56,
    "memoryUsedBytes" : 640,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 1,
    "numStateStoreInstances" : 1,
    "customMetrics" : {
      "loadedMapCacheHitCount" : 2,
      "loadedMapCacheMissCount" : 0,
      "stateOnCurrentVersionSizeBytes" : 280
    }
  } ],
  "sources" : [ {
    "description" : "KinesisV2[stream11]",
    "startOffset" : [ {
      "shard" : {
        "stream" : "stream1",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49638523104722792531473200236855507911023412145186406402",
      "lastSeqNum" : "49638523104722792531473200236855507911023412145186406402",
      "closed" : false,
      "msBehindLatest" : "0"
    }, {
      "shard" : {
        "stream" : "stream1",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49638523104745093276671730859999461480935289833761275922",
      "lastSeqNum" : "49638523104745093276671730859999461480935289833761275922",
      "closed" : false,
      "msBehindLatest" : "0"
    }, {
      "shard" : {
        "stream" : "stream1",
        "shardId" : "shardId-000000000002"
      },
      "firstSeqNum" : "49638523104767394021870261483142206125027552824441962530",
      "lastSeqNum" : "49638523104767394021870261483142206125027552824441962530",
      "closed" : false,
      "msBehindLatest" : "0"
    }, {
      "shard" : {
        "stream" : "stream1",
        "shardId" : "shardId-000000000003"
      },
      "firstSeqNum" : null,
      "lastSeqNum" : null,
      "closed" : false,
      "msBehindLatest" : "0"
    } ],
    "endOffset" : [ {
      "shard" : {
        "stream" : "stream1",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49638523104722792531473200236855507911023412145186406402",
      "lastSeqNum" : "49638523104722792531473200236855507911023412145186406402",
      "closed" : false,
      "msBehindLatest" : "0"
    }, {
      "shard" : {
        "stream" : "stream1",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49638523104745093276671730859999461480935289833761275922",
      "lastSeqNum" : "49638523104745093276671730859999461480935289833761275922",
      "closed" : false,
      "msBehindLatest" : "0"
    }, {
      "shard" : {
        "stream" : "stream1",
        "shardId" : "shardId-000000000002"
      },
      "firstSeqNum" : "49638523104767394021870261483142206125027552824441962530",
      "lastSeqNum" : "49638523104767394021870261483142206125027552824441962530",
      "closed" : false,
      "msBehindLatest" : "0"
    }, {
      "shard" : {
        "stream" : "stream1",
        "shardId" : "shardId-000000000003"
      },
      "firstSeqNum" : null,
      "lastSeqNum" : null,
      "closed" : false,
      "msBehindLatest" : "0"
    } ],
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "polling",
      "numClosedShards" : "0",
      "numProcessedBytes" : "0",
      "numProcessedRecords" : "0",
      "numStreams" : "1",
      "numTotalShards" : "4",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "MemorySink",
    "numOutputRows" : 0
  }
}
kinesisDF = spark \
  .readStream \
  .format("kinesis") \
  .option("streamName", kinesis_stream_name) \
  .option("region", region_name) \
  .option("initialPosition", initial_position) \
  .option("awsAccessKey", aws_access_key) \
  .option("awsSecretKey", aws_secret_access_key) \
  .option("maxFetchDuration", "60s") \
  .option("minFetchPeriod","60s") \
  .load() \

Can anyone help me in understanding why empty batches are triggering with kinesis source?

2 REPLIES 2

Anonymous
Not applicable

Yes, it is normal for a KinesisSource to generate empty microbatches when there is no new data in the Kinesis stream. This behavior is a consequence of how the KinesisSource is designed to work.

The KinesisSource continuously polls the Kinesis stream for new data in order to create microbatches to be processed by the Spark Streaming job. If there is no new data available in the stream, the KinesisSource will still create microbatches with no data in them. These empty microbatches are then processed by the Spark Streaming job, which will typically result in no output being generated.

About your question, it is possible that BatchId 3 was triggered because the KinesisSource continued to poll the Kinesis stream after BatchId 2 was created and processed, and it found new data that arrived after BatchId 2 had finished processing. This would result in the KinesisSource creating a new microbatch (BatchId 3) with the new data that had arrived, even though there was no new data available when BatchId 2 was created.

Anonymous
Not applicable

Hi @Pranathi Girishโ€‹ 

Thank you for posting your question in our community! We are happy to assist you.

To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?

This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance! 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group