08-20-2021 07:03 AM
Hi guys, how are you ?
Can you help me ? that my situation When I try to read a websocket with readStream I receive a unknow error exception
java.net.UnknownHostException
That's my code
wssocket = spark\
.readStream\
.format("socket")\
.option("host", "wss://stream.binance.com/ws/btcusdt@trade")\
.option("port", 9443)\
.load()
> wssocket:pyspark.sql.dataframe.DataFrame = [value: string]
wssocket.isStreaming
> True
query = wssocket.writeStream\ .format("console")\ .start()
> java.net.UnknownHostException: wss://stream.binance.com/ws/btcusdt@trade at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:607) at java.net.Socket.connect(Socket.java:556) at java.net.Socket.<init>(Socket.java:452) at java.net.Socket.<init>(Socket.java:229) at org.apache.spark.sql.execution.streaming.sources.TextSocketMicroBatchStream.initialize(TextSocketMicroBatchStream.scala:71) at org.apache.spark.sql.execution.streaming.sources.TextSocketMicroBatchStream.planInputPartitions(TextSocketMicroBatchStream.scala:117) at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.partitions$lzycompute(MicroBatchScanExec.scala:45) at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.partitions(MicroBatchScanExec.scala:45) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar(DataSourceV2ScanExecBase.scala:87) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase.supportsColumnar$(DataSourceV2ScanExecBase.scala:86) at org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec.supportsColumnar(MicroBatchScanExec.scala:30) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.apply(DataSourceV2Strategy.scala:121) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:69) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:69) at scala.collection.Iterator$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:490) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:489) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:100) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:75) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$4(QueryPlanner.scala:85) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:82) at scala.collection.Iterator$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:490) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:100) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:75) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$4(QueryPlanner.scala:85) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:82) at scala.collection.Iterator$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:490) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:100) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:75) at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:489) at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:129) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:134) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:178) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:178) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:129) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:122) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:141) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:141) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:136) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:597) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:586) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$4(MicroBatchExecution.scala:243) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:647) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:240) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:209) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:203) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:366) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$runStream(StreamExecution.scala:341) at org.apache.spark.sql.execution.streaming.StreamExecution$anon$1.run(StreamExecution.scala:268)
Tank you
09-16-2021 03:44 AM
It will definitely create a streaming object. So, don't go by wssocket.isStreaming = True
piece. Also, it will create the streaming object without any issue. Since lazy evaluation
Now, coming to the issue, please put the IP directly, sometimes the slashes create some issues.
wss://stream.binance.com/ws/btcusdt@trade may not work, but 127.9.3.1 may work.
Alternatively, that person may need to put a forward or backward slash towards the end:
wss://stream.binance.com/ws/btcusdt@trade/
Reference: https://discourse.igniterealtime.org/t/java-net-unknownhostexception-but-other-pcs-connect/58084
09-23-2021 08:23 AM
Thank you @Kaniz Fatma
09-16-2021 03:44 AM
It will definitely create a streaming object. So, don't go by wssocket.isStreaming = True
piece. Also, it will create the streaming object without any issue. Since lazy evaluation
Now, coming to the issue, please put the IP directly, sometimes the slashes create some issues.
wss://stream.binance.com/ws/btcusdt@trade may not work, but 127.9.3.1 may work.
Alternatively, that person may need to put a forward or backward slash towards the end:
wss://stream.binance.com/ws/btcusdt@trade/
Reference: https://discourse.igniterealtime.org/t/java-net-unknownhostexception-but-other-pcs-connect/58084
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now