cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to properly implement incremental batching from Kinesis Data Streams

Mathias_Peters
Contributor

Hi, 

I implemented a job that should incrementally read all the available data from a Kinesis Data Stream and terminate afterwards. I schedule the job daily. The data retention period of the data stream is 7 days, i.e., there should be enough time to fetch newly arrived data before they expire.

The code snippet reading from kinesis looks as follows: 

def dump_stream(name: str, region: str):
    print(f"Dumping stream {name} in region {region}")
    kinesis_df = (
        spark.readStream.format("kinesis")
        .option("streamName", name)
        .option("region", region)
        .option("initialPosition", "earliest")
        .option("roleArn", f"arn:aws:iam::{sys.argv[3]}:role/ROLENAME")
        .option("roleSessionName", "databricks")
        .load()
    )
    new_df = kinesis_df.selectExpr("cast (data as STRING) jsonData")

    streaming_query = (
        new_df.writeStream.format("text")
        .option("checkpointLocation", f"{sys.argv[2]}/{name}/checkpoints/")
        .option("path", f"{sys.argv[2]}/{name}")
        .option("header", False)
        .trigger(availableNow=True)
        .start()
    )
    streaming_query.awaitTermination(20)

The job worked for a few days but then I encountered the following error: 

=========

org.apache.spark.SparkException: [KINESIS_COULD_NOT_READ_SHARD_UNTIL_END_OFFSET] Could not read until the desired sequence number 49651505085739424282347553331460231169802628074051731490 for shard shardId-000000000002 in kinesis stream STREAMNAME. The query will fail due to potential data loss. The last read record was at sequence number null This can happen if the data with endSeqNum has already been aged out, or the Kinesis stream was deleted and reconstructed with the same name. The failure behavior can be overridden by setting spark.databricks.kinesis.failOnDataLoss to false in spark configuration.

============

It is my understanding, that when specifying the initialPosition as "earliest" in combination with a checkpoint location, the job will 1) attempt to read from the very beginning (trim_horizon?), 2) process all the records until the available now trigger is fired and 3) store the sequence number of the event last processed, 4) terminate. The next job run will then use the sequence number stored under 3) as the starting point. 
Did I miss something?
What can cause the job to fail (apart from a stream deletion which afaik did not occur)?
best, 
Mathias

 

 

2 REPLIES 2

christy2951hern
New Contributor II

Hello @Mathias_Peters ,

KINESIS_COULD_NOT_READ_SHARD_UNTIL_END_OFFSET error indicates that your Databricks job is unable to read all the data from the Kinesis Data Stream.

The error message suggests the job cannot find data for a specific sequence number (49651505085739424282347553331460231169802628074051731490) in shard shardId-000000000002 of your Kinesis stream named STREAMNAME. This could happen if the data with that sequence number has expired from the Kinesis stream (retention period is 7 days) or if the stream was recreated with the same name, resetting sequence numbers.

Your understanding of how initialPosition="earliest" and checkpoints work is mostly correct. 
The job initiates by reading from the beginning (similar to trim_horizon).
It processes records until the availableNow trigger fires (daily schedule).
The last processed sequence number is stored in the checkpoint.
Subsequent runs should resume from the checkpointed position.

Best Regards,
NYStateofHealth

fixhour
New Contributor II

It seems like the issue might be caused by potential data loss in the Kinesis stream. Even though you're using checkpoints and specifying the "earliest" position, data can expire due to the 7-day retention period, especially if there's a delay in job execution. To prevent the job from failing due to this, you can try setting the Spark configuration option spark.databricks.kinesis.failOnDataLoss to false, which will allow the job to continue even if data has been aged out. Also, ensure the job runs frequently enough to process the data before it expires. 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group