cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Unable to connect mongo with Databricks

InTimetec
New Contributor II

Hello,

I am trying to connect mongo with Databricks. I also used SSL certificate.

I created my own cluster and installed maven library org.mongodb.spark:mongo-spark-connector_2.12:3.0.1.

This is my code:

 

connection_string =f"mongodb://{secret['user']}:{secret['password']}@{secret['host']}:{secret['port']}/?authSource={secret['database']}&tls=true&tlsCAFile=temp/CA-certificate.pem"

df = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
    .option("database", database)\
    .option("collection", collection)\
    .option("spark.mongodb.input.uri", connectionString)\
    .option("ssl", "true")\
    .option("sslCertificate", sslCertificateFilePath)\
    .load()

 

When I am running above code getting below error:

InTimetec_0-1712295715248.png

Please provide me solution for this.

Thanks

 

4 REPLIES 4

shan_chandra
Databricks Employee
Databricks Employee

@InTimetec - can you please check/list if the sslCertificate is available in the sslCertifcateFilePath mentioned above? 

Yes, I verified. The path is correct.

InTimetec
New Contributor II

@Retired_mod I updated my code as below:

 df = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
        .option("database", database)\
        .option("collection", collection)\
        .option("spark.mongodb.input.uri", connectionString)\
        .option("tlsUseSystemCA","true")\
        .load()

Now I am getting below error:

Error: An error occurred while calling o516.load.
: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=a-coe-aws-mongo-db.cluster-cubeteznsgeb.us-west-2.docdb.amazonaws.com:27017, type=UNKNOWN, state=CONNECTING}]
	at com.mongodb.internal.connection.BaseCluster.getDescription(BaseCluster.java:177)
	at com.mongodb.internal.connection.SingleServerCluster.getDescription(SingleServerCluster.java:41)
	at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:147)
	at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:98)
	at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:278)
	at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:182)
	at com.mongodb.client.internal.MongoDatabaseImpl.executeCommand(MongoDatabaseImpl.java:194)
	at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:163)
	at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:158)
	at com.mongodb.spark.MongoConnector.$anonfun$hasSampleAggregateOperator$1(MongoConnector.scala:234)
	at com.mongodb.spark.MongoConnector.$anonfun$withDatabaseDo$1(MongoConnector.scala:171)
	at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154)
	at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171)
	at com.mongodb.spark.MongoConnector.hasSampleAggregateOperator(MongoConnector.scala:234)
	at com.mongodb.spark.rdd.MongoRDD.hasSampleAggregateOperator$lzycompute(MongoRDD.scala:221)
	at com.mongodb.spark.rdd.MongoRDD.hasSampleAggregateOperator(MongoRDD.scala:221)
	at com.mongodb.spark.sql.MongoInferSchema$.apply(MongoInferSchema.scala:68)
	at com.mongodb.spark.sql.DefaultSource.constructRelation(DefaultSource.scala:97)
	at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:50)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:390)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:378)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:334)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:334)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:226)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
	at py4j.Gateway.invoke(Gateway.java:306)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
	at java.lang.Thread.run(Thread.java:750)

I tried to increase timeout as well. for this I added below code.

.option("spark.mongodb.input.connectionTimeoutMs", "120000")

 But still getting same error. 

@InTimetec  - could you please check if the port 27017 is accessible from the workspace VPC network? Also, please check with your internal network team for any connectivity issues. 

Per this documented observation - https://community.databricks.com/t5/data-engineering/mongodb-spark-connector-v10-x-read-error-on-dat... you can use DBR 13.3 LTS cluster for compatibility.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group