03-23-2022 02:31 AM
I have a databricks notebook which is to read stream from Azure Event Hub.
My code does the following:
1.Configure path for Eventhubs
2.Read Stream
df_read_stream = (spark.readStream
.format("eventhubs")
.options(**conf)
.load())
3.Display Stream
display(df_read_stream)
Step #1 and #2 works totally fine. I have also checked df_read_stream.isStreaming is True.
However, it shows error when I run #3. I have tried clearing all state, restarting cluster, creating a new cluster, creating a new event hub, but cannot solve the issue. I have run it successfully before. After rerun, it failed and no longer be able to run anymore. Please help and thanks!
ERROR: Some streams terminated before this command could finish!
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:529)
at scala.None$.get(Option.scala:527)
at org.apache.spark.eventhubs.EventHubsConf$.toConf(EventHubsConf.scala:730)
at org.apache.spark.sql.eventhubs.EventHubsSource.<init>(EventHubsSource.scala:83)
at org.apache.spark.sql.eventhubs.EventHubsSourceProvider.createSource(EventHubsSourceProvider.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:322)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$1(MicroBatchExecution.scala:97)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:94)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:92)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:485)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:87)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:485)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:490)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1136)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1135)
at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:188)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:490)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:490)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1136)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1135)
at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:188)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:490)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:429)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.planQuery(MicroBatchExecution.scala:92)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:187)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:187)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:331)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:323)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:250)
03-23-2022 04:55 AM
Yes correct checpoint is for writeStream + display. When you don't specify in display it use some default location.
Have you installed latest version of com.microsoft.azure:azure-eventhubs-spark library?
I am including also connection to event hub which I am using:
ehConf = {}
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt("Endpoint=sb://sla-stream.servicebus.windows.net/;SharedAccessKeyName=xxxx;SharedAccessKey=xxxxx;EntityPath=xxxx")
03-23-2022 03:20 AM
Please specify in your code checkpointLocation. If you have specified already please delete files which are there.
Display functions also have checkpoint and it is better not to use display in production.
For developer purposes here is whole code for streaming display function (it is good to add interval too for developing/debugging so display will be updated less frequently):
display(streaming_df, processingTime = "5 seconds", checkpointLocation = "dbfs:/<checkpoint-path>")
03-23-2022 04:26 AM
Hi Hubert,
Thanks for the reply. After defining and adding processingTime and checkpointLocation path, I still encounter error.
Updated code looks like this:
display(df_read_stream, processingTime = "5 seconds", checkpointLocation = checkpointPath)
1st try - error message is:
ERROR: Some streams terminated before this command could finish!
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:529)
at scala.None$.get(Option.scala:527)
at org.apache.spark.eventhubs.EventHubsConf$.toConf(EventHubsConf.scala:730)
at org.apache.spark.sql.eventhubs.EventHubsSource.<init>(EventHubsSource.scala:83)
at org.apache.spark.sql.eventhubs.EventHubsSourceProvider.createSource(EventHubsSourceProvider.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:326)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$1(MicroBatchExecution.scala:98)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:95)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:93)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:484)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:86)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:484)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:262)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:258)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:489)
at org.apache.spark.sql.catalyst.plans.logical.UnaryLikeLogicalPlan.mapChildren(LogicalPlan.scala:196)
at org.apache.spark.sql.catalyst.plans.logical.UnaryLikeLogicalPlan.mapChildren$(LogicalPlan.scala:195)
at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:223)
at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:223)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:489)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:262)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:258)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:489)
at org.apache.spark.sql.catalyst.plans.logical.UnaryLikeLogicalPlan.mapChildren(LogicalPlan.scala:196)
at org.apache.spark.sql.catalyst.plans.logical.UnaryLikeLogicalPlan.mapChildren$(LogicalPlan.scala:195)
at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:223)
at org.apache.spark.sql.catalyst.plans.logical.UnaryNode.mapChildren(LogicalPlan.scala:223)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:489)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:262)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:258)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:460)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:428)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.planQuery(MicroBatchExecution.scala:93)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:163)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:163)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:353)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)
2nd try - error message is:
AnalysisException: This query does not support recovering from checkpoint location. Delete /mnt/enterprise/testingstream/checkpoint/offsets to start over.
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
<command-299622970692828> in <module>
1 mount_path = "/mnt/enterprise/testingstream"
2 checkpointPath = mount_path + "/checkpoint"
----> 3 display(df_read_stream, processingTime = "5 seconds", checkpointLocation = checkpointPath)
/databricks/python_shell/scripts/PythonShellImpl.py in display(self, input, *args, **kwargs)
979 # exists and then if it does, whether it is actually streaming.
980 if hasattr(input, 'isStreaming') and input.isStreaming:
--> 981 handleStreamingDataFrame(input, self.sc, self.sqlContext, self.entry_point, kwargs)
982 else:
983 if kwargs.get('streamName'):
/databricks/python_shell/dbruntime/display.py in handleStreamingDataFrame(input, sc, sqlContext, entry_point, kwargs)
110 .DisplayHelper.getStreamName())
111
--> 112 entry_point.getDriverSparkHooks().displayStreamingDataFrame(input._jdf, name, trigger,
113 kwargs.get('checkpointLocation'))
114
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
121 # Hide where the exception came from that shows a non-Pythonic
122 # JVM exception message.
--> 123 raise converted from None
124 else:
125 raise
AnalysisException: This query does not support recovering from checkpoint location. Delete /mnt/enterprise/testingstream/checkpoint/offsets to start over.
So, I use below code to remove checkpoint and run again. The error message is same is 1st try.
dbutils.fs.rm(checkpointPath,True)
Another question is, in past experience, I usually define checkpointLocation when I'm using spark.writeStream instead of spark.readStream. My understanding is, when the writeStream stopped in the middle unexpectedly, it can detect where it left off and resume from the checkpoint location. However, I am not sure how to stop the readStream and how does checkpointLocation works in readStream. May I understand more about this as well? Thank you very much!
03-23-2022 04:55 AM
Yes correct checpoint is for writeStream + display. When you don't specify in display it use some default location.
Have you installed latest version of com.microsoft.azure:azure-eventhubs-spark library?
I am including also connection to event hub which I am using:
ehConf = {}
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt("Endpoint=sb://sla-stream.servicebus.windows.net/;SharedAccessKeyName=xxxx;SharedAccessKey=xxxxx;EntityPath=xxxx")
03-23-2022 05:57 AM
Thanks for your help! I realized what is the root cause now!
My configuration was:
conf = {}
conf["EVENT_HUB_CONN_STR"] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt("Endpoint=sb://***.servicebus.windows.net/;SharedAccessKeyName=xxxx;SharedAccessKey=xxxxx;EntityPath=xxxx")
It worked after I changed to below:
conf = {}
conf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt("Endpoint=sb://***.servicebus.windows.net/;SharedAccessKeyName=xxxx;SharedAccessKey=xxxxx;EntityPath=xxxx")
For more supplement, I'm using below library.
com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.21
11-22-2022 11:56 PM
I am also facing same issue , using Cluster
11.3 LTS (includes Apache Spark 3.3.0, Scala 2.12)
liberary : com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.21
Please help me for same
conf = {}
conf["eventhubs.connectionString"] = "Endpoint=sb://xxxx.servicebus.windows.net/;SharedAccessKeyName=policyeh;SharedAccessKey=xxxx:=;EntityPath=xxxx "
read_df = (
spark
.readStream
.format("eventhubs")
.options(**conf)
.load()
)
display(read_df)
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group