cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Spark Streaming - Checkpoint State EOF Exception

RohanB
New Contributor III

I have a Spark Structured Streaming job which reads from 2 Delta tables in streams , processes the data and then writes to a 3rd Delta table. The job is being run with the Databricks service on GCP.

Sometimes the job fails with the following exception. The exception is intermittent and so far I have not been able to come up with the steps to reproduce the issue. What can be the cause for this exception?

Caused by: java.io.IOException: Error reading streaming state file dbfs:/checkpoint_loc/state/0/183/26.snapshot of HDFSStateStoreProvider[id = (op=0,part=183),dir = dbfs:/checkpoint_loc/state/0/183]: premature EOF reached

at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.readSnapshotFile(HDFSBackedStateStoreProvider.scala:642)

at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$3(HDFSBackedStateStoreProvider.scala:437)

at scala.Option.orElse(Option.scala:447)

at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$2(HDFSBackedStateStoreProvider.scala:437)

at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:642)

at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.loadMap(HDFSBackedStateStoreProvider.scala:417)

at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getLoadedMapForStore(HDFSBackedStateStoreProvider.scala:245)

at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:229)

at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:500)

at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:125)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)

at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)

at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)

at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)

at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)

at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)

at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)

at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)

at org.apache.spark.scheduler.Task.run(Task.scala:91)

at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:813)

at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1620)

at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:816)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

1 ACCEPTED SOLUTION

Accepted Solutions

You are using open source HDFS state store provider. There is an issue when trying to read your checkpoint. I will highly recommend to use RocksDB state store https://docs.databricks.com/spark/latest/structured-streaming/production.html#configure-rocksdb-stat...

Streaming state store providers: https://github.com/apache/spark/tree/master/sql/core/src/main/scala/org/apache/spark/sql/execution/s...

View solution in original post

8 REPLIES 8

Anonymous
Not applicable

Hello, @Rohan Bawdekar​ - My name is Piper, and I'm a moderator for the Databricks community. Welcome to the community and thank you for asking!

Let's give it a while longer to give the members of the community a chance to respond before we come back to this. 🙂

Anonymous
Not applicable

@Rohan Bawdekar​ -You are not forgotten. We are looking for someone to help you.

jose_gonzalez
Moderator
Moderator

Hi @Rohan Bawdekar​ ,

Could you share your writeStream code? I would like to know what do you use for option("checkpointLocation", "<storage_location>")

Thank you,

--Jose

RohanB
New Contributor III

Hi @Jose Gonzalez​ ,

Thanks for the reply.

Write stream is as follows

dataset.writeStream
                 .format("delta")
                 .outputMode("append")
                 .option("checkpointLocation", "dbfs:/checkpoints_v1/<table_name>")
                 .option("mergeSchema", "true")
                 .table("<table_name>")

The Dataset is created after fetching records from delta tables in a stream and applying the flatMapGroupsWithState API on the records.

Few Observations:

1) The probability of the error occurring is more at the higher loads. If the input rate for the flatMapGroupsWithState API is 12000 records/second, then then the error occurs regularly and generally within 2 hours of the start of the job. For lower loads of 4000 records / seconds, the error occurs infrequently .

2) The size of the snapshot file for which the error occurs is 0 bytes.

Let me know if you require any other information.

Thanks & Regards,

Rohan

RohanB
New Contributor III

Hi @Jose Gonzalez​ ,

Do you require any more information regarding the code? Any idea what could be cause for the issue?

Thanks and Regards,

Rohan

Hi @Rohan Bawdekar​ ,

You are using DBFS mont point in your checkpoint, what is this reference to? storage?

Thank you,

--Jose

RohanB
New Contributor III

Hi @Jose Gonzalez​ ,

Yes, it is referring to the GCP storage.

Thanks and Regards,

Rohan

You are using open source HDFS state store provider. There is an issue when trying to read your checkpoint. I will highly recommend to use RocksDB state store https://docs.databricks.com/spark/latest/structured-streaming/production.html#configure-rocksdb-stat...

Streaming state store providers: https://github.com/apache/spark/tree/master/sql/core/src/main/scala/org/apache/spark/sql/execution/s...

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group