Hi,
I implemented a job that should incrementally read all the available data from a Kinesis Data Stream and terminate afterwards. I schedule the job daily. The data retention period of the data stream is 7 days, i.e., there should be enough time to fetch newly arrived data before they expire.
The code snippet reading from kinesis looks as follows:
def dump_stream(name: str, region: str):
print(f"Dumping stream {name} in region {region}")
kinesis_df = (
spark.readStream.format("kinesis")
.option("streamName", name)
.option("region", region)
.option("initialPosition", "earliest")
.option("roleArn", f"arn:aws:iam::{sys.argv[3]}:role/ROLENAME")
.option("roleSessionName", "databricks")
.load()
)
new_df = kinesis_df.selectExpr("cast (data as STRING) jsonData")
streaming_query = (
new_df.writeStream.format("text")
.option("checkpointLocation", f"{sys.argv[2]}/{name}/checkpoints/")
.option("path", f"{sys.argv[2]}/{name}")
.option("header", False)
.trigger(availableNow=True)
.start()
)
streaming_query.awaitTermination(20)
The job worked for a few days but then I encountered the following error:
=========
org.apache.spark.SparkException: [KINESIS_COULD_NOT_READ_SHARD_UNTIL_END_OFFSET] Could not read until the desired sequence number 49651505085739424282347553331460231169802628074051731490 for shard shardId-000000000002 in kinesis stream STREAMNAME. The query will fail due to potential data loss. The last read record was at sequence number null This can happen if the data with endSeqNum has already been aged out, or the Kinesis stream was deleted and reconstructed with the same name. The failure behavior can be overridden by setting spark.databricks.kinesis.failOnDataLoss to false in spark configuration.
============
It is my understanding, that when specifying the initialPosition as "earliest" in combination with a checkpoint location, the job will 1) attempt to read from the very beginning (trim_horizon?), 2) process all the records until the available now trigger is fired and 3) store the sequence number of the event last processed, 4) terminate. The next job run will then use the sequence number stored under 3) as the starting point.
Did I miss something?
What can cause the job to fail (apart from a stream deletion which afaik did not occur)?
best,
Mathias