cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Unable to communicate with AWS DocumentDB

Elebioda
New Contributor II

Runtime version: 

15.4 LTS (includes Apache Spark 3.5.0, Scala 2.12)
Spark config:
'''
spark.hadoop.datanucleus.fixedDatastore false
spark.driver.extraJavaOptions -Djavax.net.ssl.trustStore=$JAVA_HOME/lib/security/cacerts
spark.hadoop.javax.jdo.option.ConnectionDriverName org.apache.derby.jdbc.EmbeddedDriver
spark.hadoop.javax.jdo.option.ConnectionURL jdbc:derby:memory:myInMemDB;create=true spark.hadoop.javax.jdo.option.ConnectionUserName hiveuser
spark.hadoop.javax.jdo.option.ConnectionPassword hivepass
spark.hadoop.datanucleus.autoCreateSchema true
spark.hadoop.datanucleus.autoCreateTables true
spark.executor.extraJavaOptions -Djavax.net.ssl.trustStore=$JAVA_HOME/lib/security/cacerts spark.sql.catalogImplementation hive
'''
 
Cluster Init script:
'''
#!/bin/bash

cat << 'EOF' > /usr/local/share/ca-certificates/myca.crt
-----BEGIN CERTIFICATE-----
MIICrjCCAjSgAwIBAgIRAOxu0I1QuMAhIeszB3fJIlkwCgYIKoZIzj0EAwMwgZYx
CzAJBgNVBAYTAlVTMSIwIAYDVQQKDBlBbWF6b24gV2ViIFNlcnZpY2VzLCBJbmMu
MRMwEQYDVQQLDApBbWF6b24gUkRTMQswCQYDVQQIDAJXQTEvMC0GA1UEAwwmQW1h
em9uIFJEUyB1cy13ZXN0LTIgUm9vdCBDQSBFQ0MzODQgRzExEDAOBgNVBAcMB1Nl
YXR0bGUwIBcNMjEwNTI0MjIwNjU5WhgPMjEyMTA1MjQyMzA2NTlaMIGWMQswCQYD
VQQGEwJVUzEiMCAGA1UECgwZQW1hem9uIFdlYiBTZXJ2aWNlcywgSW5jLjETMBEG
A1UECwwKQW1hem9uIFJEUzELMAkGA1UECAwCV0ExLzAtBgNVBAMMJkFtYXpvbiBS
RFMgdXMtd2VzdC0yIFJvb3QgQ0EgRUNDMzg0IEcxMRAwDgYDVQQHDAdTZWF0dGxl
MHYwEAYHKoZIzj0CAQYFK4EEACIDYgAEz4bylRcGqqDWdP7gQIIoTHdBK6FNtKH1
4SkEIXRXkYDmRvL9Bci1MuGrwuvrka5TDj4b7e+csY0llEzHpKfq6nJPFljoYYP9
uqHFkv77nOpJJ633KOr8IxmeHW5RXgrZo0IwQDAPBgNVHRMBAf8EBTADAQH/MB0G
A1UdDgQWBBQQikVz8wmjd9eDFRXzBIU8OseiGzAOBgNVHQ8BAf8EBAMCAYYwCgYI
KoZIzj0EAwMDaAAwZQIwf06Mcrpw1O0EBLBBrp84m37NYtOkE/0Z0O+C7D41wnXi
EQdn6PXUVgdD23Gj82SrAjEAklhKs+liO1PtN15yeZR1Io98nFve+lLptaLakZcH
+hfFuUtCqMbaI8CdvJlKnPqT
-----END CERTIFICATE-----
EOF

update-ca-certificates

PEM_FILE="/etc/ssl/certs/myca.pem"
PASSWORD="changeit"
JAVA_HOME=$(readlink -f /usr/bin/java | sed "s:bin/java::")
KEYSTORE="$JAVA_HOME/lib/security/cacerts"

CERTS=$(grep 'END CERTIFICATE' $PEM_FILE| wc -l)

# To process multiple certs with keytool, you need to extract
# each one from the PEM file and import it into the Java KeyStore.

for N in $(seq 0 $(($CERTS - 1))); do
ALIAS="$(basename $PEM_FILE)-$N"
echo "Adding to keystore with alias:$ALIAS"
cat $PEM_FILE |
awk "n==$N { print }; /END CERTIFICATE/ { n++ }" |
keytool -noprompt -import -trustcacerts \
-alias $ALIAS -keystore $KEYSTORE -storepass $PASSWORD
done

echo "export REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt" >> /databricks/spark/conf/spark-env.sh
echo "export SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt" >> /databricks/spark/conf/spark-env.sh
'''
 
Application Code:
'''
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd
from pymongo import MongoClient


db_url = "<REDACTED>"
port = "27017"
database = "databricks-temp-db"
collection = "test_collection2"
user = "<REDACTED>"
password = "<REDACTED>"
ca_path = "/Workspace/Users/<REDACTED>/rds-combined-ca-bundle.pem"
db_uri = f"mongodb://{user}:{password}@{db_url}/{database}.{collection}?tls=true&directConnection=true&retryWrites=false"



# db_uri = f"mongodb://{user}:{password}@{db_url}:{port}/{database}.{collection}?tls=true&tlsCAFile={ca_path}&retryWrites=false&replicaSet=rs0"
# spark.stop()
# .config("spark.mongodb.write.connection.uri", db_uri)
# .config("spark.mongodb.write.database", database)
# .config("spark.mongodb.write.collection", collection)
# .config("spark.eventLog.enabled", "true")
# .config("spark.eventLog.dir", "/tmp/spark-events")
spark = SparkSession \
.builder \
.appName("TestConnector") \
.config("packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.4.0") \
.config("spark.mongodb.read.connection.uri", db_uri) \
.config("spark.mongodb.write.connection.uri", db_uri) \
.config("spark.mongodb.ssl.caFile", "/Workspace/Users/<REDACTED>/rds-combined-ca-bundle.pem") \
.getOrCreate()
test_data = [
("Alice", 25),
("Bob", 32),
("Charlie", 35),
]
columns = ["name", "age"]
df_write = spark.createDataFrame(test_data, columns)
df_write.show()
try:
df_write.write.format("mongodb").mode("append").option("connection.uri", db_uri).option("database", database).option("sslCertificate", ca_path).option("tlsUseSystemCA", "true").option("collection", collection).save()
except Exception as e:
print(e)
```
 
Problem:
This code is unable to find certificate authority for connecting to the documentdb through ssl.
 
Error:
'''
+-------+---+ | name|age| +-------+---+ | Alice| 25| | Bob| 32| |Charlie| 35| +-------+---+ An error occurred while calling o543.save. : org.apache.spark.SparkException: Writing job failed. at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobFailedError(QueryExecutionErrors.scala:1143) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:509) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:454) at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:314) at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:432) at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:431) at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:314) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.$anonfun$result$2(V2CommandExec.scala:48) at org.apache.spark.sql.execution.SparkPlan.runCommandWithAetherOff(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.runCommandInAetherOrSpark(SparkPlan.scala:191) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.$anonfun$result$1(V2CommandExec.scala:48) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:47) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:45) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:56) at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$5(QueryExecution.scala:385) at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63) at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$4(QueryExecution.scala:385) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:182) at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$3(QueryExecution.scala:385) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$10(SQLExecution.scala:462) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:800) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:334) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:205) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:737) at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$2(QueryExecution.scala:381) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1179) at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:377) at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$withMVTagsIfNecessary(QueryExecution.scala:327) at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:374) at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:349) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:505) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:85) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:505) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:40) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:379) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:375) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:40) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:40) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:481) at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:349) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:436) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:349) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:286) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:283) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:440) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:1043) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:272) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397) at py4j.Gateway.invoke(Gateway.java:306) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199) at py4j.ClientServerConnection.run(ClientServerConnection.java:119) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 8.0 failed 4 times, most recent failure: Lost task 1.3 in stage 8.0 (TID 48) (10.0.3.196 executor 0): com.mongodb.MongoTimeoutException: Timed out while waiting for a server that matches WritableServerSelector. Client view of cluster state is {type=UNKNOWN, servers=[{address=<REDACTED>:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketWriteException: Exception sending message}, caused by {javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target}, caused by {sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target}, caused by {sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target}}] at com.mongodb.internal.connection.BaseCluster.createAndLogTimeoutException(BaseCluster.java:392) at com.mongodb.internal.connection.BaseCluster.selectServer(BaseCluster.java:148) at com.mongodb.internal.connection.SingleServerCluster.selectServer(SingleServerCluster.java:46) at com.mongodb.internal.binding.ClusterBinding.getWriteConnectionSource(ClusterBinding.java:126) at com.mongodb.client.internal.ClientSessionBinding.getConnectionSource(ClientSessionBinding.java:128) at com.mongodb.client.internal.ClientSessionBinding.getWriteConnectionSource(ClientSessionBinding.java:102) at com.mongodb.internal.operation.SyncOperationHelper.withSuppliedResource(SyncOperationHelper.java:141) at com.mongodb.internal.operation.SyncOperationHelper.withSourceAndConnection(SyncOperationHelper.java:122) at com.mongodb.internal.operation.MixedBulkWriteOperation.lambda$execute$3(MixedBulkWriteOperation.java:188) at com.mongodb.internal.operation.MixedBulkWriteOperation.lambda$decorateWriteWithRetries$0(MixedBulkWriteOperation.java:146) at com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:67) at com.mongodb.internal.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:207) at com.mongodb.internal.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:77) at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:173) at com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:449) at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:430) at com.mongodb.spark.sql.connector.write.MongoDataWriter.writeModels(MongoDataWriter.java:200) at com.mongodb.spark.sql.connector.write.MongoDataWriter.commit(MongoDataWriter.java:107) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$7(WriteToDataSourceV2Exec.scala:564) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1561) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:598) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:523) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:621) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:484) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:224) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:199) at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:161) at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:134) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:155) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:102) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$10(Executor.scala:1042) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:110) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:1045) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:932) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Suppressed: com.mongodb.spark.sql.connector.exceptions.DataException: Write aborted for: PartitionId: 1, TaskId: 48. Manual data clean up may be required. at com.mongodb.spark.sql.connector.write.MongoDataWriter.abort(MongoDataWriter.java:121) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$14(WriteToDataSourceV2Exec.scala:592) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1572) ... 27 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.$anonfun$failJobAndIndependentStages$1(DAGScheduler.scala:4018) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:4016) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3930) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3917) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3917) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1766) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1749) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1749) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:4277) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4179) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4165) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:55) at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1412) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1400) at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:3157) at org.apache.spark.SparkContext.runJob(SparkContext.scala:3138) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:481) ... 60 more Suppressed: com.mongodb.spark.sql.connector.exceptions.DataException: Write aborted for: f169b679-1353-405c-9b14-9d5e526c7c7c. 1/4 tasks completed. at com.mongodb.spark.sql.connector.write.MongoBatchWrite.abort(MongoBatchWrite.java:91) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:503) ... 60 more Caused by: com.mongodb.MongoTimeoutException: Timed out while waiting for a server that matches WritableServerSelector. Client view of cluster state is {type=UNKNOWN, servers=[{address=<REDACTED>:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketWriteException: Exception sending message}, caused by {javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target}, caused by {sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target}, caused by {sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target}}] at com.mongodb.internal.connection.BaseCluster.createAndLogTimeoutException(BaseCluster.java:392) at com.mongodb.internal.connection.BaseCluster.selectServer(BaseCluster.java:148) at com.mongodb.internal.connection.SingleServerCluster.selectServer(SingleServerCluster.java:46) at com.mongodb.internal.binding.ClusterBinding.getWriteConnectionSource(ClusterBinding.java:126) at com.mongodb.client.internal.ClientSessionBinding.getConnectionSource(ClientSessionBinding.java:128) at com.mongodb.client.internal.ClientSessionBinding.getWriteConnectionSource(ClientSessionBinding.java:102) at com.mongodb.internal.operation.SyncOperationHelper.withSuppliedResource(SyncOperationHelper.java:141) at com.mongodb.internal.operation.SyncOperationHelper.withSourceAndConnection(SyncOperationHelper.java:122) at com.mongodb.internal.operation.MixedBulkWriteOperation.lambda$execute$3(MixedBulkWriteOperation.java:188) at com.mongodb.internal.operation.MixedBulkWriteOperation.lambda$decorateWriteWithRetries$0(MixedBulkWriteOperation.java:146) at com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:67) at com.mongodb.internal.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:207) at com.mongodb.internal.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:77) at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:173) at com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:449) at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:430) at com.mongodb.spark.sql.connector.write.MongoDataWriter.writeModels(MongoDataWriter.java:200) at com.mongodb.spark.sql.connector.write.MongoDataWriter.commit(MongoDataWriter.java:107) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$7(WriteToDataSourceV2Exec.scala:564) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1561) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:598) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:523) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:621) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:484) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:224) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:199) at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:161) at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:134) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:155) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:102) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$10(Executor.scala:1042) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:110) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:1045) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:932) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Suppressed: com.mongodb.spark.sql.connector.exceptions.DataException: Write aborted for: PartitionId: 1, TaskId: 48. Manual data clean up may be required. at com.mongodb.spark.sql.connector.write.MongoDataWriter.abort(MongoDataWriter.java:121) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$14(WriteToDataSourceV2Exec.scala:592) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1572) ... 27 more
'''
 
2 REPLIES 2

Walter_C
Databricks Employee
Databricks Employee

The error seems to be related to writing data to a MongoDB data source, as indicated by the com.mongodb.spark.sql.connector.exceptions.DataException.

It appears that the error is occurring during the execution of a Spark job that involves writing data to a MongoDB data source. The error message shows that the write operation was aborted for a specific partition and task, and manual data cleanup may be required.

Here are some steps you can take to troubleshoot and resolve this issue:

  1. Check MongoDB Connection and Configuration: Ensure that the MongoDB connection details and configurations are correct. Verify that the MongoDB server is running and accessible from the Spark cluster.

  2. Review Data Schema and Types: Ensure that the data being written to MongoDB matches the expected schema and data types. Any discrepancies in the schema or data types can cause write failures.

  3. Check for Data Skew: Data skew can cause certain partitions to have significantly more data than others, leading to task failures. Review the data distribution and consider repartitioning the data to balance the load.

  4. Increase Resources: If the task is failing due to resource constraints, consider increasing the resources allocated to the Spark job, such as executor memory and cores.

Elebioda
New Contributor II

Im not sure that is the problem as stated by the error from the job stage:

'''

com.mongodb.MongoTimeoutException: Timed out while waiting for a server that matches WritableServerSelector. Client view of cluster state is {type=UNKNOWN, servers=[{address=<<REDACTED>>:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketWriteException: Exception sending message}, caused by {javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target}, caused by {sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target}, caused by {sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target}}]

'''

 

Id also like to point out that I did verify connection from inside the node:

'''

%sh openssl s_client -connect <REDACTED>:27017 -CAfile /Workspace/Users/<REDACTED>/rds-combined-ca-bundle.pem

'''

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group