Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
Showing results for 
Search instead for 
Did you mean: 

How to properly implement incremental batching from Kinesis Data Streams



I implemented a job that should incrementally read all the available data from a Kinesis Data Stream and terminate afterwards. I schedule the job daily. The data retention period of the data stream is 7 days, i.e., there should be enough time to fetch newly arrived data before they expire.

The code snippet reading from kinesis looks as follows: 

def dump_stream(name: str, region: str):
    print(f"Dumping stream {name} in region {region}")
    kinesis_df = (
        .option("streamName", name)
        .option("region", region)
        .option("initialPosition", "earliest")
        .option("roleArn", f"arn:aws:iam::{sys.argv[3]}:role/ROLENAME")
        .option("roleSessionName", "databricks")
    new_df = kinesis_df.selectExpr("cast (data as STRING) jsonData")

    streaming_query = (
        .option("checkpointLocation", f"{sys.argv[2]}/{name}/checkpoints/")
        .option("path", f"{sys.argv[2]}/{name}")
        .option("header", False)

The job worked for a few days but then I encountered the following error: 


org.apache.spark.SparkException: [KINESIS_COULD_NOT_READ_SHARD_UNTIL_END_OFFSET] Could not read until the desired sequence number 49651505085739424282347553331460231169802628074051731490 for shard shardId-000000000002 in kinesis stream STREAMNAME. The query will fail due to potential data loss. The last read record was at sequence number null This can happen if the data with endSeqNum has already been aged out, or the Kinesis stream was deleted and reconstructed with the same name. The failure behavior can be overridden by setting spark.databricks.kinesis.failOnDataLoss to false in spark configuration.


It is my understanding, that when specifying the initialPosition as "earliest" in combination with a checkpoint location, the job will 1) attempt to read from the very beginning (trim_horizon?), 2) process all the records until the available now trigger is fired and 3) store the sequence number of the event last processed, 4) terminate. The next job run will then use the sequence number stored under 3) as the starting point. 
Did I miss something?
What can cause the job to fail (apart from a stream deletion which afaik did not occur)?




New Contributor II

Hello @Mathias_Peters ,

KINESIS_COULD_NOT_READ_SHARD_UNTIL_END_OFFSET error indicates that your Databricks job is unable to read all the data from the Kinesis Data Stream.

The error message suggests the job cannot find data for a specific sequence number (49651505085739424282347553331460231169802628074051731490) in shard shardId-000000000002 of your Kinesis stream named STREAMNAME. This could happen if the data with that sequence number has expired from the Kinesis stream (retention period is 7 days) or if the stream was recreated with the same name, resetting sequence numbers.

Your understanding of how initialPosition="earliest" and checkpoints work is mostly correct. 
The job initiates by reading from the beginning (similar to trim_horizon).
It processes records until the availableNow trigger fires (daily schedule).
The last processed sequence number is stored in the checkpoint.
Subsequent runs should resume from the checkpointed position.

Best Regards,
Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!