12-23-2022 07:44 AM
I'm able to connect to MongoDB using org.mongodb.spark:mongo-spark-connector_2.12:3.0.2 and this code:
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", jdbcUrl)
It works well, but if I install last MongoDB Spark Connector version 10.0.5 and I try to connect using:
df = spark.read.format("mongodb").option("spark.mongodb.input.uri",jdbcUrl)
In that case MongoDB server returns connection timeout. Any ideas? I need to work with 10x libraries because they allows structured streaming from mongoDB.
01-09-2023 02:49 AM
Hi all, finally I found the solution by my self. The problem was with the options, which has changed in the last version.
Working code:
spark.read.format("mongodb").option("spark.mongodb.read.connection.uri" ....
Thanks!
12-23-2022 08:41 AM
Please run magic command %sh in the notebook and check if you have routing there (as it can be a network issue)
%sh
telenet <mongodb_server_name> 28017
12-23-2022 09:27 AM
Hi @Abel Martinez
Validate the connection between cluster and MongoDB server. Run a nc to the URL and see if the connection is working for MongoDB port (27017) ?
%sh nc -zv <url> <port>
%sh curl -vvv URL
If there are no issues on the networking side please share the complete error message to check further.
12-23-2022 02:07 PM
Please check also certificate issues https://gitlab.besedo.com/operation-tools/ebay-social-media-report/-/blob/master/classes/mailbox.py
12-23-2022 08:33 PM
there can be 3 things
1-Your firewall is blocking its IP
2-Your Logs are getting huge Traffic
3-This can be databricks deployment in your workspace
12-27-2022 03:08 PM
Do you see any error message in your driver logs? if you do, please share the information here.
01-09-2023 02:17 AM
Hi all, thanks for your answers.
This is not a connection issue as the same code, with the same IPs and ports is working using format("com.mongodb.spark.sql.DefaultSource") but it doesn't work using format("mongodb")
Here you have the error message in the driver logs...
23/01/09 10:01:19 INFO cluster: Exception in monitor thread while connecting to server localhost:27017
com.mongodb.MongoSocketOpenException: Exception opening socket
at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:70)
at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:180)
at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.lookupServerDescription(DefaultServerMonitor.java:188)
at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:152)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:613)
at com.mongodb.internal.connection.SocketStreamHelper.initialize(SocketStreamHelper.java:107)
at com.mongodb.internal.connection.SocketStream.initializeSocket(SocketStream.java:79)
at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:65)
... 4 more
23/01/09 10:01:49 WARN JupyterDriverLocal: User code returned error with traceback: [0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m Traceback (most recent call last)
[0;32m<command-2679801502558508>[0m in [0;36m<cell line: 3>[0;34m()[0m
[1;32m 2[0m [0;34m[0m[0m
[1;32m 3[0m [0;32mfor[0m [0mrow[0m [0;32min[0m [0msqlDF[0m[0;34m.[0m[0mcollect[0m[0;34m([0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 4[0;31m [0mgetData[0m[0;34m([0m[0mrow[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m<command-2679801502558507>[0m in [0;36mgetData[0;34m(row)[0m
[1;32m 43[0m [0;34m[0m[0m
[1;32m 44[0m [0;34m[0m[0m
[0;32m---> 45[0;31m [0mdf[0m [0;34m=[0m [0mspark[0m[0;34m.[0m[0mread[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0;34m"mongodb"[0m[0;34m)[0m[0;34m.[0m[0moption[0m[0;34m([0m[0;34m"spark.mongodb.input.uri"[0m[0;34m,[0m[0mjdbcUrl[0m[0;34m)[0m[0;31m [0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m 46[0m [0;34m.[0m[0moption[0m[0;34m([0m[0;34m"database"[0m[0;34m,[0m [0mrow[0m[0;34m[[0m[0;34m'source_schema'[0m[0;34m][0m[0;34m)[0m[0;31m [0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[1;32m 47[0m [0;34m.[0m[0moption[0m[0;34m([0m[0;34m"collection"[0m[0;34m,[0m [0mrow[0m[0;34m[[0m[0;34m'source_table'[0m[0;34m][0m[0;34m)[0m[0;34m.[0m[0mload[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m 46[0m [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m 47[0m [0;32mtry[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 48[0;31m [0mres[0m [0;34m=[0m [0mfunc[0m[0;34m([0m[0;34m*[0m[0margs[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m 49[0m logger.log_success(
[1;32m 50[0m [0mmodule_name[0m[0;34m,[0m [0mclass_name[0m[0;34m,[0m [0mfunction_name[0m[0;34m,[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m [0;34m-[0m [0mstart[0m[0;34m,[0m [0msignature[0m[0;34m[0m[0;34m[0m[0m
[0;32m/databricks/spark/python/pyspark/sql/readwriter.py[0m in [0;36mload[0;34m(self, path, format, schema, **options)[0m
[1;32m 182[0m [0;32mreturn[0m [0mself[0m[0;34m.[0m[0m_df[0m[0;34m([0m[0mself[0m[0;34m.[0m[0m_jreader[0m[0;34m.[0m[0mload[0m[0;34m([0m[0mself[0m[0;34m.[0m[0m_spark[0m[0;34m.[0m[0m_sc[0m[0;34m.[0m[0m_jvm[0m[0;34m.[0m[0mPythonUtils[0m[0;34m.[0m[0mtoSeq[0m[0;34m([0m[0mpath[0m[0;34m)[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m 183[0m [0;32melse[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m--> 184[0;31m [0;32mreturn[0m [0mself[0m[0;34m.[0m[0m_df[0m[0;34m([0m[0mself[0m[0;34m.[0m[0m_jreader[0m[0;34m.[0m[0mload[0m[0;34m([0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m 185[0m [0;34m[0m[0m
[1;32m 186[0m def json(
[0;32m/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py[0m in [0;36m__call__[0;34m(self, *args)[0m
[1;32m 1319[0m [0;34m[0m[0m
[1;32m 1320[0m [0manswer[0m [0;34m=[0m [0mself[0m[0;34m.[0m[0mgateway_client[0m[0;34m.[0m[0msend_command[0m[0;34m([0m[0mcommand[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m-> 1321[0;31m return_value = get_return_value(
[0m[1;32m 1322[0m answer, self.gateway_client, self.target_id, self.name)
[1;32m 1323[0m [0;34m[0m[0m
[0;32m/databricks/spark/python/pyspark/sql/utils.py[0m in [0;36mdeco[0;34m(*a, **kw)[0m
[1;32m 194[0m [0;32mdef[0m [0mdeco[0m[0;34m([0m[0;34m*[0m[0ma[0m[0;34m:[0m [0mAny[0m[0;34m,[0m [0;34m**[0m[0mkw[0m[0;34m:[0m [0mAny[0m[0;34m)[0m [0;34m->[0m [0mAny[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m 195[0m [0;32mtry[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m--> 196[0;31m [0;32mreturn[0m [0mf[0m[0;34m([0m[0;34m*[0m[0ma[0m[0;34m,[0m [0;34m**[0m[0mkw[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m 197[0m [0;32mexcept[0m [0mPy4JJavaError[0m [0;32mas[0m [0me[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m 198[0m [0mconverted[0m [0;34m=[0m [0mconvert_exception[0m[0;34m([0m[0me[0m[0;34m.[0m[0mjava_exception[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py[0m in [0;36mget_return_value[0;34m(answer, gateway_client, target_id, name)[0m
[1;32m 324[0m [0mvalue[0m [0;34m=[0m [0mOUTPUT_CONVERTER[0m[0;34m[[0m[0mtype[0m[0;34m][0m[0;34m([0m[0manswer[0m[0;34m[[0m[0;36m2[0m[0;34m:[0m[0;34m][0m[0;34m,[0m [0mgateway_client[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m 325[0m [0;32mif[0m [0manswer[0m[0;34m[[0m[0;36m1[0m[0;34m][0m [0;34m==[0m [0mREFERENCE_TYPE[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m--> 326[0;31m raise Py4JJavaError(
[0m[1;32m 327[0m [0;34m"An error occurred while calling {0}{1}{2}.\n"[0m[0;34m.[0m[0;34m[0m[0;34m[0m[0m
[1;32m 328[0m format(target_id, ".", name), value)
[0;31mPy4JJavaError[0m: An error occurred while calling o604.load.
: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=localhost:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused (Connection refused)}}]
01-09-2023 02:49 AM
Hi all, finally I found the solution by my self. The problem was with the options, which has changed in the last version.
Working code:
spark.read.format("mongodb").option("spark.mongodb.read.connection.uri" ....
Thanks!
04-11-2023 02:56 PM
Hello Abel, I´m facing a similar problem. What was the option you've used for the 10.0.5?
11-20-2023 11:08 PM
Hi @Abel_Martinez, I want to express my gratitude for your effort in selecting the most suitable solution. It's great to hear that your query has been successfully resolved. Thank you for your contribution.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group