cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Why I'm getting connection timeout when connecting to MongoDB using MongoDB Connector for Spark 10.x from Databricks

Abel_Martinez
Contributor

I'm able to connect to MongoDB using org.mongodb.spark:mongo-spark-connector_2.12:3.0.2 and this code:

df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", jdbcUrl)

It works well, but if I install last MongoDB Spark Connector version 10.0.5 and I try to connect using:

df = spark.read.format("mongodb").option("spark.mongodb.input.uri",jdbcUrl)

In that case MongoDB server returns connection timeout. Any ideas? I need to work with 10x libraries because they allows structured streaming from mongoDB.

1 ACCEPTED SOLUTION

Accepted Solutions

Abel_Martinez
Contributor

Hi all, finally I found the solution by my self. The problem was with the options, which has changed in the last version.

Working code:

spark.read.format("mongodb").option("spark.mongodb.read.connection.uri" ....

Thanks!

View solution in original post

9 REPLIES 9

Hubert-Dudek
Esteemed Contributor III

Please run magic command %sh in the notebook and check if you have routing there (as it can be a network issue)

%sh
telenet  <mongodb_server_name> 28017

Vivian_Wilfred
Databricks Employee
Databricks Employee

Hi @Abel Martinezโ€‹ 

Validate the connection between cluster and MongoDB server. Run a nc to the URL and see if the connection is working for MongoDB port (27017) ?

%sh nc -zv <url> <port>
%sh curl -vvv URL

If there are no issues on the networking side please share the complete error message to check further.

Hubert-Dudek
Esteemed Contributor III

Aviral-Bhardwaj
Esteemed Contributor III

there can be 3 things

1-Your firewall is blocking its IP

2-Your Logs are getting huge Traffic

3-This can be databricks deployment in your workspace

AviralBhardwaj

jose_gonzalez
Databricks Employee
Databricks Employee

Do you see any error message in your driver logs? if you do, please share the information here.

Abel_Martinez
Contributor

Hi all, thanks for your answers.

This is not a connection issue as the same code, with the same IPs and ports is working using format("com.mongodb.spark.sql.DefaultSource") but it doesn't work using format("mongodb")

Here you have the error message in the driver logs...

23/01/09 10:01:19 INFO cluster: Exception in monitor thread while connecting to server localhost:27017
com.mongodb.MongoSocketOpenException: Exception opening socket
	at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:70)
	at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:180)
	at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.lookupServerDescription(DefaultServerMonitor.java:188)
	at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:152)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:613)
	at com.mongodb.internal.connection.SocketStreamHelper.initialize(SocketStreamHelper.java:107)
	at com.mongodb.internal.connection.SocketStream.initializeSocket(SocketStream.java:79)
	at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:65)
	... 4 more
23/01/09 10:01:49 WARN JupyterDriverLocal: User code returned error with traceback: [0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-2679801502558508>[0m in [0;36m<cell line: 3>[0;34m()[0m
[1;32m      2[0m [0;34m[0m[0m
[1;32m      3[0m [0;32mfor[0m [0mrow[0m [0;32min[0m [0msqlDF[0m[0;34m.[0m[0mcollect[0m[0;34m([0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 4[0;31m     [0mgetData[0m[0;34m([0m[0mrow[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m<command-2679801502558507>[0m in [0;36mgetData[0;34m(row)[0m
[1;32m     43[0m [0;34m[0m[0m
[1;32m     44[0m [0;34m[0m[0m
[0;32m---> 45[0;31m     [0mdf[0m [0;34m=[0m [0mspark[0m[0;34m.[0m[0mread[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0;34m"mongodb"[0m[0;34m)[0m[0;34m.[0m[0moption[0m[0;34m([0m[0;34m"spark.mongodb.input.uri"[0m[0;34m,[0m[0mjdbcUrl[0m[0;34m)[0m[0;31m [0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m     46[0m                                      [0;34m.[0m[0moption[0m[0;34m([0m[0;34m"database"[0m[0;34m,[0m [0mrow[0m[0;34m[[0m[0;34m'source_schema'[0m[0;34m][0m[0;34m)[0m[0;31m [0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[1;32m     47[0m                                      [0;34m.[0m[0moption[0m[0;34m([0m[0;34m"collection"[0m[0;34m,[0m [0mrow[0m[0;34m[[0m[0;34m'source_table'[0m[0;34m][0m[0;34m)[0m[0;34m.[0m[0mload[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
 
[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     47[0m             [0;32mtry[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 48[0;31m                 [0mres[0m [0;34m=[0m [0mfunc[0m[0;34m([0m[0;34m*[0m[0margs[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m     49[0m                 logger.log_success(
[1;32m     50[0m                     [0mmodule_name[0m[0;34m,[0m [0mclass_name[0m[0;34m,[0m [0mfunction_name[0m[0;34m,[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m [0;34m-[0m [0mstart[0m[0;34m,[0m [0msignature[0m[0;34m[0m[0;34m[0m[0m
 
[0;32m/databricks/spark/python/pyspark/sql/readwriter.py[0m in [0;36mload[0;34m(self, path, format, schema, **options)[0m
[1;32m    182[0m             [0;32mreturn[0m [0mself[0m[0;34m.[0m[0m_df[0m[0;34m([0m[0mself[0m[0;34m.[0m[0m_jreader[0m[0;34m.[0m[0mload[0m[0;34m([0m[0mself[0m[0;34m.[0m[0m_spark[0m[0;34m.[0m[0m_sc[0m[0;34m.[0m[0m_jvm[0m[0;34m.[0m[0mPythonUtils[0m[0;34m.[0m[0mtoSeq[0m[0;34m([0m[0mpath[0m[0;34m)[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m    183[0m         [0;32melse[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m--> 184[0;31m             [0;32mreturn[0m [0mself[0m[0;34m.[0m[0m_df[0m[0;34m([0m[0mself[0m[0;34m.[0m[0m_jreader[0m[0;34m.[0m[0mload[0m[0;34m([0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m    185[0m [0;34m[0m[0m
[1;32m    186[0m     def json(
 
[0;32m/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py[0m in [0;36m__call__[0;34m(self, *args)[0m
[1;32m   1319[0m [0;34m[0m[0m
[1;32m   1320[0m         [0manswer[0m [0;34m=[0m [0mself[0m[0;34m.[0m[0mgateway_client[0m[0;34m.[0m[0msend_command[0m[0;34m([0m[0mcommand[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m-> 1321[0;31m         return_value = get_return_value(
[0m[1;32m   1322[0m             answer, self.gateway_client, self.target_id, self.name)
[1;32m   1323[0m [0;34m[0m[0m
 
[0;32m/databricks/spark/python/pyspark/sql/utils.py[0m in [0;36mdeco[0;34m(*a, **kw)[0m
[1;32m    194[0m     [0;32mdef[0m [0mdeco[0m[0;34m([0m[0;34m*[0m[0ma[0m[0;34m:[0m [0mAny[0m[0;34m,[0m [0;34m**[0m[0mkw[0m[0;34m:[0m [0mAny[0m[0;34m)[0m [0;34m->[0m [0mAny[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m    195[0m         [0;32mtry[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m--> 196[0;31m             [0;32mreturn[0m [0mf[0m[0;34m([0m[0;34m*[0m[0ma[0m[0;34m,[0m [0;34m**[0m[0mkw[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m    197[0m         [0;32mexcept[0m [0mPy4JJavaError[0m [0;32mas[0m [0me[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m    198[0m             [0mconverted[0m [0;34m=[0m [0mconvert_exception[0m[0;34m([0m[0me[0m[0;34m.[0m[0mjava_exception[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
 
[0;32m/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py[0m in [0;36mget_return_value[0;34m(answer, gateway_client, target_id, name)[0m
[1;32m    324[0m             [0mvalue[0m [0;34m=[0m [0mOUTPUT_CONVERTER[0m[0;34m[[0m[0mtype[0m[0;34m][0m[0;34m([0m[0manswer[0m[0;34m[[0m[0;36m2[0m[0;34m:[0m[0;34m][0m[0;34m,[0m [0mgateway_client[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m    325[0m             [0;32mif[0m [0manswer[0m[0;34m[[0m[0;36m1[0m[0;34m][0m [0;34m==[0m [0mREFERENCE_TYPE[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m--> 326[0;31m                 raise Py4JJavaError(
[0m[1;32m    327[0m                     [0;34m"An error occurred while calling {0}{1}{2}.\n"[0m[0;34m.[0m[0;34m[0m[0;34m[0m[0m
[1;32m    328[0m                     format(target_id, ".", name), value)
 
[0;31mPy4JJavaError[0m: An error occurred while calling o604.load.
: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=localhost:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused (Connection refused)}}]

Abel_Martinez
Contributor

Hi all, finally I found the solution by my self. The problem was with the options, which has changed in the last version.

Working code:

spark.read.format("mongodb").option("spark.mongodb.read.connection.uri" ....

Thanks!

Anonymous
Not applicable

Hello Abel, Iยดm facing a similar problem. What was the option you've used for the 10.0.5?

ravisharma1024
New Contributor II

I was facing the same issue, now It is resolved, and thanks to @Abel_Martinez.

I am using this like below code:

df = spark.read.format("mongodb") \
.option('spark.mongodb.read.connection.uri', "mongodb+srv://*****:*****@******/?retryWrites=true&w=majority&appName=****&tls=true") \
.option('database', 'database_name') \
.option('collection', 'collection_name') \
.load()



Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group