cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

ERROR: Some streams terminated before this command could finish!

VN11111
New Contributor III

I have a databricks notebook which is to read stream from Azure Event Hub.

My code does the following:

1.Configure path for Eventhubs

2.Read Stream

df_read_stream = (spark.readStream
                 .format("eventhubs")
                 .options(**conf)
                 .load())

3.Display Stream

display(df_read_stream)

Step #1 and #2 works totally fine. I have also checked df_read_stream.isStreaming is True.

However, it shows error when I run #3. I have tried clearing all state, restarting cluster, creating a new cluster, creating a new event hub, but cannot solve the issue. I have run it successfully before. After rerun, it failed and no longer be able to run anymore. Please help and thanks!

ERROR: Some streams terminated before this command could finish!
java.util.NoSuchElementException: None.get
	at scala.None$.get(Option.scala:529)
	at scala.None$.get(Option.scala:527)
	at org.apache.spark.eventhubs.EventHubsConf$.toConf(EventHubsConf.scala:730)
	at org.apache.spark.sql.eventhubs.EventHubsSource.<init>(EventHubsSource.scala:83)
	at org.apache.spark.sql.eventhubs.EventHubsSourceProvider.createSource(EventHubsSourceProvider.scala:84)
	at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:322)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$1(MicroBatchExecution.scala:97)
	at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:94)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:92)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:485)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:87)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:485)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:490)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1136)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1135)
	at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:188)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:490)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:490)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1136)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1135)
	at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:188)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:490)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:429)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.planQuery(MicroBatchExecution.scala:92)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:187)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:187)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:331)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:323)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:250)

1 ACCEPTED SOLUTION

Accepted Solutions

Hubert-Dudek
Esteemed Contributor III

Yes correct checpoint is for writeStream + display. When you don't specify in display it use some default location.

Have you installed latest version of com.microsoft.azure:azure-eventhubs-spark library?

I am including also connection to event hub which I am using:

ehConf = {}
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt("Endpoint=sb://sla-stream.servicebus.windows.net/;SharedAccessKeyName=xxxx;SharedAccessKey=xxxxx;EntityPath=xxxx")

View solution in original post

5 REPLIES 5

Hubert-Dudek
Esteemed Contributor III

Please specify in your code checkpointLocation. If you have specified already please delete files which are there.

Display functions also have checkpoint and it is better not to use display in production.

For developer purposes here is whole code for streaming display function (it is good to add interval too for developing/debugging so display will be updated less frequently):

display(streaming_df, processingTime = "5 seconds", checkpointLocation = "dbfs:/<checkpoint-path>")

VN11111
New Contributor III

Hi Hubert,

Thanks for the reply. After defining and adding processingTime and checkpointLocation path, I still encounter error.

Updated code looks like this:

display(df_read_stream, processingTime = "5 seconds", checkpointLocation = checkpointPath)

1st try - error message is:

ERROR: Some streams terminated before this command could finish!
java.util.NoSuchElementException: None.get
	at scala.None$.get(Option.scala:529)
	at scala.None$.get(Option.scala:527)
	at org.apache.spark.eventhubs.EventHubsConf$.toConf(EventHubsConf.scala:730)
	at org.apache.spark.sql.eventhubs.EventHubsSource.&lt;init&gt;(EventHubsSource.scala:83)
	at org.apache.spark.sql.eventhubs.EventHubsSourceProvider.createSource(EventHubsSourceProvider.scala:84)
	at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:326)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$1(MicroBatchExecution.scala:98)
	at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:95)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:93)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:484)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:86)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:484)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:262)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:258)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:489)
	at org.apache.spark.sql.catalyst.plans.logical.UnaryLikeLogicalPlan.mapChildren(LogicalPlan.scala:196)
	at org.apache.spark.sql.catalyst.plans.logical.UnaryLikeLogicalPlan.mapChildren$(LogicalPlan.scala:195)
	at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:223)
	at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:223)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:489)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:262)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:258)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:489)
	at org.apache.spark.sql.catalyst.plans.logical.UnaryLikeLogicalPlan.mapChildren(LogicalPlan.scala:196)
	at org.apache.spark.sql.catalyst.plans.logical.UnaryLikeLogicalPlan.mapChildren$(LogicalPlan.scala:195)
	at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:223)
	at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:223)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:489)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:262)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:258)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:460)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:428)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.planQuery(MicroBatchExecution.scala:93)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:163)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:163)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:353)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)

2nd try - error message is:

AnalysisException: This query does not support recovering from checkpoint location. Delete /mnt/enterprise/testingstream/checkpoint/offsets to start over.
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<command-299622970692828> in <module>
      1 mount_path = "/mnt/enterprise/testingstream"
      2 checkpointPath = mount_path + "/checkpoint"
----> 3 display(df_read_stream, processingTime = "5 seconds", checkpointLocation = checkpointPath)
 
/databricks/python_shell/scripts/PythonShellImpl.py in display(self, input, *args, **kwargs)
    979             # exists and then if it does, whether it is actually streaming.
    980             if hasattr(input, 'isStreaming') and input.isStreaming:
--> 981                 handleStreamingDataFrame(input, self.sc, self.sqlContext, self.entry_point, kwargs)
    982             else:
    983                 if kwargs.get('streamName'):
 
/databricks/python_shell/dbruntime/display.py in handleStreamingDataFrame(input, sc, sqlContext, entry_point, kwargs)
    110                       .DisplayHelper.getStreamName())
    111 
--> 112     entry_point.getDriverSparkHooks().displayStreamingDataFrame(input._jdf, name, trigger,
    113                                                                 kwargs.get('checkpointLocation'))
    114 
 
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
 
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    121                 # Hide where the exception came from that shows a non-Pythonic
    122                 # JVM exception message.
--> 123                 raise converted from None
    124             else:
    125                 raise
 
AnalysisException: This query does not support recovering from checkpoint location. Delete /mnt/enterprise/testingstream/checkpoint/offsets to start over.

So, I use below code to remove checkpoint and run again. The error message is same is 1st try.

dbutils.fs.rm(checkpointPath,True)

Another question is, in past experience, I usually define checkpointLocation when I'm using spark.writeStream instead of spark.readStream. My understanding is, when the writeStream stopped in the middle unexpectedly, it can detect where it left off and resume from the checkpoint location. However, I am not sure how to stop the readStream and how does checkpointLocation works in readStream. May I understand more about this as well? Thank you very much!

Hubert-Dudek
Esteemed Contributor III

Yes correct checpoint is for writeStream + display. When you don't specify in display it use some default location.

Have you installed latest version of com.microsoft.azure:azure-eventhubs-spark library?

I am including also connection to event hub which I am using:

ehConf = {}
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt("Endpoint=sb://sla-stream.servicebus.windows.net/;SharedAccessKeyName=xxxx;SharedAccessKey=xxxxx;EntityPath=xxxx")

VN11111
New Contributor III

Thanks for your help! I realized what is the root cause now!

My configuration was:

conf = {}
conf["EVENT_HUB_CONN_STR"] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt("Endpoint=sb://***.servicebus.windows.net/;SharedAccessKeyName=xxxx;SharedAccessKey=xxxxx;EntityPath=xxxx")

It worked after I changed to below:

conf = {}
conf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt("Endpoint=sb://***.servicebus.windows.net/;SharedAccessKeyName=xxxx;SharedAccessKey=xxxxx;EntityPath=xxxx")

For more supplement, I'm using below library.

com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.21

guru1
New Contributor II

I am also facing same issue , using Cluster

11.3 LTS (includes Apache Spark 3.3.0, Scala 2.12)

liberary : com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.21

Please help me for same

conf = {}

conf["eventhubs.connectionString"] = "Endpoint=sb://xxxx.servicebus.windows.net/;SharedAccessKeyName=policyeh;SharedAccessKey=xxxx:=;EntityPath=xxxx "

read_df = (

 spark

  .readStream

  .format("eventhubs")

  .options(**conf)

  .load()

)

display(read_df)

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group