โ01-27-2022 02:39 AM
I have a Spark Structured Streaming job which reads from 2 Delta tables in streams , processes the data and then writes to a 3rd Delta table. The job is being run with the Databricks service on GCP.
Sometimes the job fails with the following exception. The exception is intermittent and so far I have not been able to come up with the steps to reproduce the issue. What can be the cause for this exception?
Caused by: java.io.IOException: Error reading streaming state file dbfs:/checkpoint_loc/state/0/183/26.snapshot of HDFSStateStoreProvider[id = (op=0,part=183),dir = dbfs:/checkpoint_loc/state/0/183]: premature EOF reached
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.readSnapshotFile(HDFSBackedStateStoreProvider.scala:642)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$3(HDFSBackedStateStoreProvider.scala:437)
at scala.Option.orElse(Option.scala:447)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.$anonfun$loadMap$2(HDFSBackedStateStoreProvider.scala:437)
at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:642)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.loadMap(HDFSBackedStateStoreProvider.scala:417)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getLoadedMapForStore(HDFSBackedStateStoreProvider.scala:245)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:229)
at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:500)
at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:125)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:91)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:813)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1620)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:816)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
โ03-07-2022 03:22 PM
You are using open source HDFS state store provider. There is an issue when trying to read your checkpoint. I will highly recommend to use RocksDB state store https://docs.databricks.com/spark/latest/structured-streaming/production.html#configure-rocksdb-stat...
Streaming state store providers: https://github.com/apache/spark/tree/master/sql/core/src/main/scala/org/apache/spark/sql/execution/s...
 
					
				
		
โ01-27-2022 02:45 PM
Hello, @Rohan Bawdekarโ - My name is Piper, and I'm a moderator for the Databricks community. Welcome to the community and thank you for asking!
Let's give it a while longer to give the members of the community a chance to respond before we come back to this. ๐
 
					
				
		
โ02-07-2022 08:26 AM
@Rohan Bawdekarโ -You are not forgotten. We are looking for someone to help you.
โ02-08-2022 04:31 PM
Hi @Rohan Bawdekarโ ,
Could you share your writeStream code? I would like to know what do you use for option("checkpointLocation", "<storage_location>")
Thank you,
--Jose
โ02-08-2022 08:45 PM
Hi @Jose Gonzalezโ ,
Thanks for the reply.
Write stream is as follows
dataset.writeStream
                 .format("delta")
                 .outputMode("append")
                 .option("checkpointLocation", "dbfs:/checkpoints_v1/<table_name>")
                 .option("mergeSchema", "true")
                 .table("<table_name>")The Dataset is created after fetching records from delta tables in a stream and applying the flatMapGroupsWithState API on the records.
Few Observations:
1) The probability of the error occurring is more at the higher loads. If the input rate for the flatMapGroupsWithState API is 12000 records/second, then then the error occurs regularly and generally within 2 hours of the start of the job. For lower loads of 4000 records / seconds, the error occurs infrequently .
2) The size of the snapshot file for which the error occurs is 0 bytes.
Let me know if you require any other information.
Thanks & Regards,
Rohan
โ02-15-2022 04:27 AM
Hi @Jose Gonzalezโ ,
Do you require any more information regarding the code? Any idea what could be cause for the issue?
Thanks and Regards,
Rohan
โ02-16-2022 11:04 AM
Hi @Rohan Bawdekarโ ,
You are using DBFS mont point in your checkpoint, what is this reference to? storage?
Thank you,
--Jose
โ02-16-2022 09:37 PM
Hi @Jose Gonzalezโ ,
Yes, it is referring to the GCP storage.
Thanks and Regards,
Rohan
โ03-07-2022 03:22 PM
You are using open source HDFS state store provider. There is an issue when trying to read your checkpoint. I will highly recommend to use RocksDB state store https://docs.databricks.com/spark/latest/structured-streaming/production.html#configure-rocksdb-stat...
Streaming state store providers: https://github.com/apache/spark/tree/master/sql/core/src/main/scala/org/apache/spark/sql/execution/s...
 
					
				
				
			
		
 
					
				
				
			
		
Passionate about hosting events and connecting people? Help us grow a vibrant local communityโsign up today to get started!
Sign Up Now