โ08-20-2021 07:03 AM
Hi guys, how are you ?
Can you help me ? that my situation When I try to read a websocket with readStream I receive a unknow error exception
java.net.UnknownHostException
That's my code
wssocket = spark\
.readStream\
.format("socket")\
.option("host", "wss://stream.binance.com/ws/btcusdt@trade")\
.option("port", 9443)\
.load()
> wssocket:pyspark.sql.dataframe.DataFrame = [value: string]
wssocket.isStreaming
> True
query = wssocket.writeStream\ .format("console")\ .start()
> java.net.UnknownHostException: wss://stream.binance.com/ws/btcusdt@trade at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:607) at java.net.Socket.connect(Socket.java:556) at java.net.Socket.<init>(Socket.java:452) at java.net.Socket.<init>(Socket.java:229) at org.apache.spark.sql.execution.streaming.sources.TextSocketMicroBatchStream.initialize(TextSocketMicroBatchStream.scala:71) at org.apache.spark.sql.execution.streaming.sources.TextSocketMicroBatchStream.planInputPartitions(TextSocketMicroBatchStream.scala:117) at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.partitions$lzycompute(MicroBatchScanExec.scala:45) at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.partitions(MicroBatchScanExec.scala:45) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar(DataSourceV2ScanExecBase.scala:87) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar$(DataSourceV2ScanExecBase.scala:86) at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.supportsColumnar(MicroBatchScanExec.scala:30) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.apply(DataSourceV2Strategy.scala:121) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:69) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:69) at scala.collection.Iterator$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:490) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:489) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:100) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:75) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$4(QueryPlanner.scala:85) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:82) at scala.collection.Iterator$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:490) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:100) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:75) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$4(QueryPlanner.scala:85) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:82) at scala.collection.Iterator$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:490) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:100) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:75) at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:489) at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:129) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:134) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:178) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:178) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:129) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:122) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:141) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:141) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:136) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:597) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:586) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$4(MicroBatchExecution.scala:243) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:647) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:240) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:209) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:203) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:366) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$runStream(StreamExecution.scala:341) at org.apache.spark.sql.execution.streaming.StreamExecution$anon$1.run(StreamExecution.scala:268)
Tank you
โ09-16-2021 03:44 AM
It will definitely create a streaming object. So, don't go by wssocket.isStreaming = True
piece. Also, it will create the streaming object without any issue. Since lazy evaluation
Now, coming to the issue, please put the IP directly, sometimes the slashes create some issues.
wss://stream.binance.com/ws/btcusdt@trade may not work, but 127.9.3.1 may work.
Alternatively, that person may need to put a forward or backward slash towards the end:
wss://stream.binance.com/ws/btcusdt@trade/
Reference: https://discourse.igniterealtime.org/t/java-net-unknownhostexception-but-other-pcs-connect/58084
โ09-09-2021 03:52 AM
Hi @ WilliamScardua ! My name is Kaniz, and I'm the technical moderator here. Great to meet you, and thanks for your question! Let's see if your peers on the community have an answer to your question first. Or else I will follow up shortly with a response.
โ09-23-2021 08:23 AM
Thank you @Kaniz Fatmaโ
โ09-16-2021 03:44 AM
It will definitely create a streaming object. So, don't go by wssocket.isStreaming = True
piece. Also, it will create the streaming object without any issue. Since lazy evaluation
Now, coming to the issue, please put the IP directly, sometimes the slashes create some issues.
wss://stream.binance.com/ws/btcusdt@trade may not work, but 127.9.3.1 may work.
Alternatively, that person may need to put a forward or backward slash towards the end:
wss://stream.binance.com/ws/btcusdt@trade/
Reference: https://discourse.igniterealtime.org/t/java-net-unknownhostexception-but-other-pcs-connect/58084
โ04-04-2022 01:09 AM
Hi @William Scarduaโ , Were you able to resolve your problem? Did @Deepak Bhutadaโ 's suggestion help you in any way? Do you need any more help?
Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections.
Click here to register and join today!
Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.