cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Why I'm getting connection timeout when connecting to MongoDB using MongoDB Connector for Spark 10.x from Databricks

Abel_Martinez
Contributor

I'm able to connect to MongoDB using org.mongodb.spark:mongo-spark-connector_2.12:3.0.2 and this code:

df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", jdbcUrl)

It works well, but if I install last MongoDB Spark Connector version 10.0.5 and I try to connect using:

df = spark.read.format("mongodb").option("spark.mongodb.input.uri",jdbcUrl)

In that case MongoDB server returns connection timeout. Any ideas? I need to work with 10x libraries because they allows structured streaming from mongoDB.

1 ACCEPTED SOLUTION

Accepted Solutions

Abel_Martinez
Contributor

Hi all, finally I found the solution by my self. The problem was with the options, which has changed in the last version.

Working code:

spark.read.format("mongodb").option("spark.mongodb.read.connection.uri" ....

Thanks!

View solution in original post

9 REPLIES 9

Hubert-Dudek
Esteemed Contributor III

Please run magic command %sh in the notebook and check if you have routing there (as it can be a network issue)

%sh
telenet  <mongodb_server_name> 28017

Vivian_Wilfred
Honored Contributor
Honored Contributor

Hi @Abel Martinez​ 

Validate the connection between cluster and MongoDB server. Run a nc to the URL and see if the connection is working for MongoDB port (27017) ?

%sh nc -zv <url> <port>
%sh curl -vvv URL

If there are no issues on the networking side please share the complete error message to check further.

Aviral-Bhardwaj
Esteemed Contributor III

there can be 3 things

1-Your firewall is blocking its IP

2-Your Logs are getting huge Traffic

3-This can be databricks deployment in your workspace

jose_gonzalez
Moderator
Moderator

Do you see any error message in your driver logs? if you do, please share the information here.

Abel_Martinez
Contributor

Hi all, thanks for your answers.

This is not a connection issue as the same code, with the same IPs and ports is working using format("com.mongodb.spark.sql.DefaultSource") but it doesn't work using format("mongodb")

Here you have the error message in the driver logs...

23/01/09 10:01:19 INFO cluster: Exception in monitor thread while connecting to server localhost:27017
com.mongodb.MongoSocketOpenException: Exception opening socket
	at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:70)
	at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:180)
	at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.lookupServerDescription(DefaultServerMonitor.java:188)
	at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:152)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:613)
	at com.mongodb.internal.connection.SocketStreamHelper.initialize(SocketStreamHelper.java:107)
	at com.mongodb.internal.connection.SocketStream.initializeSocket(SocketStream.java:79)
	at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:65)
	... 4 more
23/01/09 10:01:49 WARN JupyterDriverLocal: User code returned error with traceback: [0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-2679801502558508>[0m in [0;36m<cell line: 3>[0;34m()[0m
[1;32m      2[0m [0;34m[0m[0m
[1;32m      3[0m [0;32mfor[0m [0mrow[0m [0;32min[0m [0msqlDF[0m[0;34m.[0m[0mcollect[0m[0;34m([0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 4[0;31m     [0mgetData[0m[0;34m([0m[0mrow[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m<command-2679801502558507>[0m in [0;36mgetData[0;34m(row)[0m
[1;32m     43[0m [0;34m[0m[0m
[1;32m     44[0m [0;34m[0m[0m
[0;32m---> 45[0;31m     [0mdf[0m [0;34m=[0m [0mspark[0m[0;34m.[0m[0mread[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0;34m"mongodb"[0m[0;34m)[0m[0;34m.[0m[0moption[0m[0;34m([0m[0;34m"spark.mongodb.input.uri"[0m[0;34m,[0m[0mjdbcUrl[0m[0;34m)[0m[0;31m [0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m     46[0m                                      [0;34m.[0m[0moption[0m[0;34m([0m[0;34m"database"[0m[0;34m,[0m [0mrow[0m[0;34m[[0m[0;34m'source_schema'[0m[0;34m][0m[0;34m)[0m[0;31m [0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[1;32m     47[0m                                      [0;34m.[0m[0moption[0m[0;34m([0m[0;34m"collection"[0m[0;34m,[0m [0mrow[0m[0;34m[[0m[0;34m'source_table'[0m[0;34m][0m[0;34m)[0m[0;34m.[0m[0mload[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
 
[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     47[0m             [0;32mtry[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 48[0;31m                 [0mres[0m [0;34m=[0m [0mfunc[0m[0;34m([0m[0;34m*[0m[0margs[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m     49[0m                 logger.log_success(
[1;32m     50[0m                     [0mmodule_name[0m[0;34m,[0m [0mclass_name[0m[0;34m,[0m [0mfunction_name[0m[0;34m,[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m [0;34m-[0m [0mstart[0m[0;34m,[0m [0msignature[0m[0;34m[0m[0;34m[0m[0m
 
[0;32m/databricks/spark/python/pyspark/sql/readwriter.py[0m in [0;36mload[0;34m(self, path, format, schema, **options)[0m
[1;32m    182[0m             [0;32mreturn[0m [0mself[0m[0;34m.[0m[0m_df[0m[0;34m([0m[0mself[0m[0;34m.[0m[0m_jreader[0m[0;34m.[0m[0mload[0m[0;34m([0m[0mself[0m[0;34m.[0m[0m_spark[0m[0;34m.[0m[0m_sc[0m[0;34m.[0m[0m_jvm[0m[0;34m.[0m[0mPythonUtils[0m[0;34m.[0m[0mtoSeq[0m[0;34m([0m[0mpath[0m[0;34m)[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m    183[0m         [0;32melse[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m--> 184[0;31m             [0;32mreturn[0m [0mself[0m[0;34m.[0m[0m_df[0m[0;34m([0m[0mself[0m[0;34m.[0m[0m_jreader[0m[0;34m.[0m[0mload[0m[0;34m([0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m    185[0m [0;34m[0m[0m
[1;32m    186[0m     def json(
 
[0;32m/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py[0m in [0;36m__call__[0;34m(self, *args)[0m
[1;32m   1319[0m [0;34m[0m[0m
[1;32m   1320[0m         [0manswer[0m [0;34m=[0m [0mself[0m[0;34m.[0m[0mgateway_client[0m[0;34m.[0m[0msend_command[0m[0;34m([0m[0mcommand[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m-> 1321[0;31m         return_value = get_return_value(
[0m[1;32m   1322[0m             answer, self.gateway_client, self.target_id, self.name)
[1;32m   1323[0m [0;34m[0m[0m
 
[0;32m/databricks/spark/python/pyspark/sql/utils.py[0m in [0;36mdeco[0;34m(*a, **kw)[0m
[1;32m    194[0m     [0;32mdef[0m [0mdeco[0m[0;34m([0m[0;34m*[0m[0ma[0m[0;34m:[0m [0mAny[0m[0;34m,[0m [0;34m**[0m[0mkw[0m[0;34m:[0m [0mAny[0m[0;34m)[0m [0;34m->[0m [0mAny[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m    195[0m         [0;32mtry[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m--> 196[0;31m             [0;32mreturn[0m [0mf[0m[0;34m([0m[0;34m*[0m[0ma[0m[0;34m,[0m [0;34m**[0m[0mkw[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m    197[0m         [0;32mexcept[0m [0mPy4JJavaError[0m [0;32mas[0m [0me[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m    198[0m             [0mconverted[0m [0;34m=[0m [0mconvert_exception[0m[0;34m([0m[0me[0m[0;34m.[0m[0mjava_exception[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
 
[0;32m/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py[0m in [0;36mget_return_value[0;34m(answer, gateway_client, target_id, name)[0m
[1;32m    324[0m             [0mvalue[0m [0;34m=[0m [0mOUTPUT_CONVERTER[0m[0;34m[[0m[0mtype[0m[0;34m][0m[0;34m([0m[0manswer[0m[0;34m[[0m[0;36m2[0m[0;34m:[0m[0;34m][0m[0;34m,[0m [0mgateway_client[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m    325[0m             [0;32mif[0m [0manswer[0m[0;34m[[0m[0;36m1[0m[0;34m][0m [0;34m==[0m [0mREFERENCE_TYPE[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m--> 326[0;31m                 raise Py4JJavaError(
[0m[1;32m    327[0m                     [0;34m"An error occurred while calling {0}{1}{2}.\n"[0m[0;34m.[0m[0;34m[0m[0;34m[0m[0m
[1;32m    328[0m                     format(target_id, ".", name), value)
 
[0;31mPy4JJavaError[0m: An error occurred while calling o604.load.
: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=localhost:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused (Connection refused)}}]

Abel_Martinez
Contributor

Hi all, finally I found the solution by my self. The problem was with the options, which has changed in the last version.

Working code:

spark.read.format("mongodb").option("spark.mongodb.read.connection.uri" ....

Thanks!

Anonymous
Not applicable

Hello Abel, I´m facing a similar problem. What was the option you've used for the 10.0.5?

Kaniz
Community Manager
Community Manager

Hi @Abel_Martinez, I want to express my gratitude for your effort in selecting the most suitable solution. It's great to hear that your query has been successfully resolved. Thank you for your contribution.




 

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.