โ01-27-2022 02:39 AM
I have a Spark Structured Streaming job which reads from 2 Delta tables in streams , processes the data and then writes to a 3rd Delta table. The job is being run with the Databricks service on GCP.
Sometimes the job fails with the following exception. The exception is intermittent and so far I have not been able to come up with the steps to reproduce the issue. What can be the cause for this exception?
Caused by: java.io.IOException: Error reading streaming state file dbfs:/checkpoint_loc/state/0/183/26.snapshot of HDFSStateStoreProvider[id = (op=0,part=183),dir = dbfs:/checkpoint_loc/state/0/183]: premature EOF reached
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.readSnapshotFile(HDFSBackedStateStoreProvider.scala:642)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$3(HDFSBackedStateStoreProvider.scala:437)
at scala.Option.orElse(Option.scala:447)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$2(HDFSBackedStateStoreProvider.scala:437)
at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:642)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.loadMap(HDFSBackedStateStoreProvider.scala:417)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getLoadedMapForStore(HDFSBackedStateStoreProvider.scala:245)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:229)
at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:500)
at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:125)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:91)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:813)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1620)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:816)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
โ03-07-2022 03:22 PM
You are using open source HDFS state store provider. There is an issue when trying to read your checkpoint. I will highly recommend to use RocksDB state store https://docs.databricks.com/spark/latest/structured-streaming/production.html#configure-rocksdb-stat...
Streaming state store providers: https://github.com/apache/spark/tree/master/sql/core/src/main/scala/org/apache/spark/sql/execution/s...
โ01-27-2022 02:45 PM
Hello, @Rohan Bawdekarโ - My name is Piper, and I'm a moderator for the Databricks community. Welcome to the community and thank you for asking!
Let's give it a while longer to give the members of the community a chance to respond before we come back to this. ๐
โ02-07-2022 08:26 AM
@Rohan Bawdekarโ -You are not forgotten. We are looking for someone to help you.
โ02-08-2022 04:31 PM
Hi @Rohan Bawdekarโ ,
Could you share your writeStream code? I would like to know what do you use for option("checkpointLocation", "<storage_location>")
Thank you,
--Jose
โ02-08-2022 08:45 PM
Hi @Jose Gonzalezโ ,
Thanks for the reply.
Write stream is as follows
dataset.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "dbfs:/checkpoints_v1/<table_name>")
.option("mergeSchema", "true")
.table("<table_name>")
The Dataset is created after fetching records from delta tables in a stream and applying the flatMapGroupsWithState API on the records.
Few Observations:
1) The probability of the error occurring is more at the higher loads. If the input rate for the flatMapGroupsWithState API is 12000 records/second, then then the error occurs regularly and generally within 2 hours of the start of the job. For lower loads of 4000 records / seconds, the error occurs infrequently .
2) The size of the snapshot file for which the error occurs is 0 bytes.
Let me know if you require any other information.
Thanks & Regards,
Rohan
โ02-15-2022 04:27 AM
Hi @Jose Gonzalezโ ,
Do you require any more information regarding the code? Any idea what could be cause for the issue?
Thanks and Regards,
Rohan
โ02-16-2022 11:04 AM
Hi @Rohan Bawdekarโ ,
You are using DBFS mont point in your checkpoint, what is this reference to? storage?
Thank you,
--Jose
โ02-16-2022 09:37 PM
Hi @Jose Gonzalezโ ,
Yes, it is referring to the GCP storage.
Thanks and Regards,
Rohan
โ03-07-2022 03:22 PM
You are using open source HDFS state store provider. There is an issue when trying to read your checkpoint. I will highly recommend to use RocksDB state store https://docs.databricks.com/spark/latest/structured-streaming/production.html#configure-rocksdb-stat...
Streaming state store providers: https://github.com/apache/spark/tree/master/sql/core/src/main/scala/org/apache/spark/sql/execution/s...
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group