VN11111
New Contributor III

Hi Hubert,

Thanks for the reply. After defining and adding processingTime and checkpointLocation path, I still encounter error.

Updated code looks like this:

display(df_read_stream, processingTime = "5 seconds", checkpointLocation = checkpointPath)

1st try - error message is:

ERROR: Some streams terminated before this command could finish!
java.util.NoSuchElementException: None.get
	at scala.None$.get(Option.scala:529)
	at scala.None$.get(Option.scala:527)
	at org.apache.spark.eventhubs.EventHubsConf$.toConf(EventHubsConf.scala:730)
	at org.apache.spark.sql.eventhubs.EventHubsSource.<init>(EventHubsSource.scala:83)
	at org.apache.spark.sql.eventhubs.EventHubsSourceProvider.createSource(EventHubsSourceProvider.scala:84)
	at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:326)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$1(MicroBatchExecution.scala:98)
	at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:95)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:93)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:484)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:86)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:484)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:262)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:258)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:489)
	at org.apache.spark.sql.catalyst.plans.logical.UnaryLikeLogicalPlan.mapChildren(LogicalPlan.scala:196)
	at org.apache.spark.sql.catalyst.plans.logical.UnaryLikeLogicalPlan.mapChildren$(LogicalPlan.scala:195)
	at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:223)
	at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:223)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:489)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:262)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:258)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:489)
	at org.apache.spark.sql.catalyst.plans.logical.UnaryLikeLogicalPlan.mapChildren(LogicalPlan.scala:196)
	at org.apache.spark.sql.catalyst.plans.logical.UnaryLikeLogicalPlan.mapChildren$(LogicalPlan.scala:195)
	at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:223)
	at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:223)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:489)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:262)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:258)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:460)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:428)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.planQuery(MicroBatchExecution.scala:93)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:163)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:163)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:353)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)

2nd try - error message is:

AnalysisException: This query does not support recovering from checkpoint location. Delete /mnt/enterprise/testingstream/checkpoint/offsets to start over.
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<command-299622970692828> in <module>
      1 mount_path = "/mnt/enterprise/testingstream"
      2 checkpointPath = mount_path + "/checkpoint"
----> 3 display(df_read_stream, processingTime = "5 seconds", checkpointLocation = checkpointPath)
 
/databricks/python_shell/scripts/PythonShellImpl.py in display(self, input, *args, **kwargs)
    979             # exists and then if it does, whether it is actually streaming.
    980             if hasattr(input, 'isStreaming') and input.isStreaming:
--> 981                 handleStreamingDataFrame(input, self.sc, self.sqlContext, self.entry_point, kwargs)
    982             else:
    983                 if kwargs.get('streamName'):
 
/databricks/python_shell/dbruntime/display.py in handleStreamingDataFrame(input, sc, sqlContext, entry_point, kwargs)
    110                       .DisplayHelper.getStreamName())
    111 
--> 112     entry_point.getDriverSparkHooks().displayStreamingDataFrame(input._jdf, name, trigger,
    113                                                                 kwargs.get('checkpointLocation'))
    114 
 
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
 
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    121                 # Hide where the exception came from that shows a non-Pythonic
    122                 # JVM exception message.
--> 123                 raise converted from None
    124             else:
    125                 raise
 
AnalysisException: This query does not support recovering from checkpoint location. Delete /mnt/enterprise/testingstream/checkpoint/offsets to start over.

So, I use below code to remove checkpoint and run again. The error message is same is 1st try.

dbutils.fs.rm(checkpointPath,True)

Another question is, in past experience, I usually define checkpointLocation when I'm using spark.writeStream instead of spark.readStream. My understanding is, when the writeStream stopped in the middle unexpectedly, it can detect where it left off and resume from the checkpoint location. However, I am not sure how to stop the readStream and how does checkpointLocation works in readStream. May I understand more about this as well? Thank you very much!