01-27-2022 02:39 AM
I have a Spark Structured Streaming job which reads from 2 Delta tables in streams , processes the data and then writes to a 3rd Delta table. The job is being run with the Databricks service on GCP.
Sometimes the job fails with the following exception. The exception is intermittent and so far I have not been able to come up with the steps to reproduce the issue. What can be the cause for this exception?
Caused by: java.io.IOException: Error reading streaming state file dbfs:/checkpoint_loc/state/0/183/26.snapshot of HDFSStateStoreProvider[id = (op=0,part=183),dir = dbfs:/checkpoint_loc/state/0/183]: premature EOF reached
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.readSnapshotFile(HDFSBackedStateStoreProvider.scala:642)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$3(HDFSBackedStateStoreProvider.scala:437)
at scala.Option.orElse(Option.scala:447)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$2(HDFSBackedStateStoreProvider.scala:437)
at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:642)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.loadMap(HDFSBackedStateStoreProvider.scala:417)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getLoadedMapForStore(HDFSBackedStateStoreProvider.scala:245)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:229)
at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:500)
at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:125)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:91)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:813)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1620)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:816)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
03-07-2022 03:22 PM
You are using open source HDFS state store provider. There is an issue when trying to read your checkpoint. I will highly recommend to use RocksDB state store https://docs.databricks.com/spark/latest/structured-streaming/production.html#configure-rocksdb-stat...
Streaming state store providers: https://github.com/apache/spark/tree/master/sql/core/src/main/scala/org/apache/spark/sql/execution/s...
01-27-2022 02:45 PM
Hello, @Rohan Bawdekar - My name is Piper, and I'm a moderator for the Databricks community. Welcome to the community and thank you for asking!
Let's give it a while longer to give the members of the community a chance to respond before we come back to this. 🙂
02-07-2022 08:26 AM
@Rohan Bawdekar -You are not forgotten. We are looking for someone to help you.
02-08-2022 04:31 PM
Hi @Rohan Bawdekar ,
Could you share your writeStream code? I would like to know what do you use for option("checkpointLocation", "<storage_location>")
Thank you,
--Jose
02-08-2022 08:45 PM
Hi @Jose Gonzalez ,
Thanks for the reply.
Write stream is as follows
dataset.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "dbfs:/checkpoints_v1/<table_name>")
.option("mergeSchema", "true")
.table("<table_name>")
The Dataset is created after fetching records from delta tables in a stream and applying the flatMapGroupsWithState API on the records.
Few Observations:
1) The probability of the error occurring is more at the higher loads. If the input rate for the flatMapGroupsWithState API is 12000 records/second, then then the error occurs regularly and generally within 2 hours of the start of the job. For lower loads of 4000 records / seconds, the error occurs infrequently .
2) The size of the snapshot file for which the error occurs is 0 bytes.
Let me know if you require any other information.
Thanks & Regards,
Rohan
02-15-2022 04:27 AM
Hi @Jose Gonzalez ,
Do you require any more information regarding the code? Any idea what could be cause for the issue?
Thanks and Regards,
Rohan
02-16-2022 11:04 AM
Hi @Rohan Bawdekar ,
You are using DBFS mont point in your checkpoint, what is this reference to? storage?
Thank you,
--Jose
02-16-2022 09:37 PM
Hi @Jose Gonzalez ,
Yes, it is referring to the GCP storage.
Thanks and Regards,
Rohan
03-07-2022 03:22 PM
You are using open source HDFS state store provider. There is an issue when trying to read your checkpoint. I will highly recommend to use RocksDB state store https://docs.databricks.com/spark/latest/structured-streaming/production.html#configure-rocksdb-stat...
Streaming state store providers: https://github.com/apache/spark/tree/master/sql/core/src/main/scala/org/apache/spark/sql/execution/s...
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group