- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-20-2021 07:03 AM
Hi guys, how are you ?
Can you help me ? that my situation When I try to read a websocket with readStream I receive a unknow error exception
java.net.UnknownHostException
That's my code
wssocket = spark\
.readStream\
.format("socket")\
.option("host", "wss://stream.binance.com/ws/btcusdt@trade")\
.option("port", 9443)\
.load()
> wssocket:pyspark.sql.dataframe.DataFrame = [value: string]
wssocket.isStreaming
> True
query = wssocket.writeStream\ .format("console")\ .start()
> java.net.UnknownHostException: wss://stream.binance.com/ws/btcusdt@trade at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:607) at java.net.Socket.connect(Socket.java:556) at java.net.Socket.<init>(Socket.java:452) at java.net.Socket.<init>(Socket.java:229) at org.apache.spark.sql.execution.streaming.sources.TextSocketMicroBatchStream.initialize(TextSocketMicroBatchStream.scala:71) at org.apache.spark.sql.execution.streaming.sources.TextSocketMicroBatchStream.planInputPartitions(TextSocketMicroBatchStream.scala:117) at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.partitions$lzycompute(MicroBatchScanExec.scala:45) at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.partitions(MicroBatchScanExec.scala:45) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar(DataSourceV2ScanExecBase.scala:87) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar$(DataSourceV2ScanExecBase.scala:86) at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.supportsColumnar(MicroBatchScanExec.scala:30) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.apply(DataSourceV2Strategy.scala:121) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:69) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:69) at scala.collection.Iterator$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:490) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:489) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:100) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:75) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$4(QueryPlanner.scala:85) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:82) at scala.collection.Iterator$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:490) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:100) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:75) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$4(QueryPlanner.scala:85) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:82) at scala.collection.Iterator$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:490) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:100) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:75) at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:489) at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:129) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:134) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:178) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:178) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:129) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:122) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:141) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:141) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:136) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:597) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:586) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$4(MicroBatchExecution.scala:243) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:647) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:240) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:209) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:203) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:366) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$runStream(StreamExecution.scala:341) at org.apache.spark.sql.execution.streaming.StreamExecution$anon$1.run(StreamExecution.scala:268)
Tank you
- Labels:
-
Readstream
-
Websocket
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-16-2021 03:44 AM
It will definitely create a streaming object. So, don't go by wssocket.isStreaming = True
piece. Also, it will create the streaming object without any issue. Since lazy evaluation
Now, coming to the issue, please put the IP directly, sometimes the slashes create some issues.
wss://stream.binance.com/ws/btcusdt@trade may not work, but 127.9.3.1 may work.
Alternatively, that person may need to put a forward or backward slash towards the end:
wss://stream.binance.com/ws/btcusdt@trade/
Reference: https://discourse.igniterealtime.org/t/java-net-unknownhostexception-but-other-pcs-connect/58084
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-23-2021 08:23 AM
Thank you @Kaniz Fatma
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-16-2021 03:44 AM
It will definitely create a streaming object. So, don't go by wssocket.isStreaming = True
piece. Also, it will create the streaming object without any issue. Since lazy evaluation
Now, coming to the issue, please put the IP directly, sometimes the slashes create some issues.
wss://stream.binance.com/ws/btcusdt@trade may not work, but 127.9.3.1 may work.
Alternatively, that person may need to put a forward or backward slash towards the end:
wss://stream.binance.com/ws/btcusdt@trade/
Reference: https://discourse.igniterealtime.org/t/java-net-unknownhostexception-but-other-pcs-connect/58084