cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Connecting Databricks Spark Cluster to Postgresql RDS Instance

Cano
New Contributor III

I am trying to connect my Spark cluster to a Postgresql RDS instance. The Python notebook code that was used is seen below:

df = ( spark.read \
  .format("jdbc") \
  .option("url", "jdbc:postgresql://<connection-string>:5432/databaseโ€)\
  .option("dbtable", โ€œ<schema.table>โ€œ)\
  .option("user", "postgres")\
  .option("password", โ€œPass*****โ€)
  .load()
)

The following error message was received

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command-1164003892694289> in <module>
----> 1 df = ( spark.read \
      2   .format("jdbc") \
      3   .option("url", "jdbc:postgresql://<connection-string>:5432/database") \
      4   .option("dbtable", "<schema.table>") \
      5   .option("user", "postgres") \
 
/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
    162             return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    163         else:
--> 164             return self._df(self._jreader.load())
    165 
    166     def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
 
/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
 
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    115     def deco(*a, **kw):
    116         try:
--> 117             return f(*a, **kw)
    118         except py4j.protocol.Py4JJavaError as e:
    119             converted = convert_exception(e.java_exception)
 
/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)
 
Py4JJavaError: An error occurred while calling o1170.load.
: org.postgresql.util.PSQLException: The connection attempt failed.
	at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:315)
	at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:51)
	at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:223)
	at org.postgresql.Driver.makeConnection(Driver.java:465)
	at org.postgresql.Driver.connect(Driver.java:264)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider.getConnection(BasicConnectionProvider.scala:49)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProviderBase.create(ConnectionProvider.scala:102)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$createConnectionFactory$1(JdbcUtils.scala:69)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.getQueryOutputSchema(JDBCRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:58)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:241)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:36)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:385)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:356)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:323)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:323)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:222)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
	at py4j.Gateway.invoke(Gateway.java:295)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:251)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: connect timed out
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:607)
	at org.postgresql.core.PGStream.createSocket(PGStream.java:231)
	at org.postgresql.core.PGStream.<init>(PGStream.java:95)
	at org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:98)
	at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:213)
	... 28 more

Any help or direction will be greatly appreciated, Thank you.

Please note that I changed the connection URL and other details for security reasons and that's not how they were entered.

15 REPLIES 15

User16873043099
Contributor

"Caused by: java.net.SocketTimeoutException: connect timed out" indicate the network connection between Databricks cluster and the postgress database on 5432 port was not established and eventually timed out.

As a first step, please ensure the connection between DB cluster and postgres works fine

To test the connectivity, you can run below on a notebook or web terminal

%sh nc -v <postgres host> 5432

Cano
New Contributor III

Thank you Rohit for your response. I realized that %sh nc -v <postgres host> 5432 does not return anything (times out), meaning there's no connectivity to Postgres. Can you please direct me to where and how I can establish connectivity to Postgres? Where in DBricks can I allow port 5432? Will it be in the IAM profile? Any advice will help.

Cano
New Contributor III

Okay so I resolved this problem by creating a VPC peering between the Databricks VPC and my local VPC that hosts my Postgresql instance. https://docs.databricks.com/administration-guide/cloud-configurations/aws/vpc-peering.html?_ga=2.423....

I have now been able to create a dataframe linking to my Postgresql RDS with the same notebook python code as seen above.

Glad to know the issue got resolved.

Yes, the connectivity needs to be established from the AWS account VPC where DB clusters are hosted to the postgres.

kennyg
New Contributor III

Hi @Rohit Rajendranโ€‹ and @charles okohโ€‹ I was able to set up VPC peering for my customer's Databricks and Redshift account.

  • I tested the connection via %sh host -t a <redshift hostname> and confirmed the IP address.
  • I also ran a display(spark.sql("SELECT * FROM csv.`s3://<customer/s3bucketpath>`")) to confirm I can access the S3 bucket.

However when I attempt to run:

  1. df = ( spark.read \
  2. .format("jdbc") \
  3. .option("url", "jdbc:redshift://<connection-string>:5439/databaseโ€)\
  4. .option("dbtable", โ€œ<schema.table>โ€œ)\
  5. .option("user", "customeruser")\
  6. .option("password", โ€œPass*****โ€)
  7. .load()
  8. )

I get the error below:

Py4JJavaError: An error occurred while calling o486.load.

: java.sql.SQLException: The connection attempt failed.

at com.amazon.redshift.util.RedshiftException.getSQLException(RedshiftException.java:56)

at com.amazon.redshift.Driver.connect(Driver.java:339)

at org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:46)

at com.databricks.spark.redshift.JDBCWrapper.getConnector(RedshiftJDBCWrapper.scala:355)

at com.databricks.spark.redshift.JDBCWrapper.getConnector(RedshiftJDBCWrapper.scala:376)

at com.databricks.spark.redshift.RedshiftRelation.$anonfun$schema$1(RedshiftRelation.scala:76)

at scala.Option.getOrElse(Option.scala:189)

at com.databricks.spark.redshift.RedshiftRelation.schema$lzycompute(RedshiftRelation.scala:73)

at com.databricks.spark.redshift.RedshiftRelation.schema(RedshiftRelation.scala:72)

at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:496)

at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:356)

at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:323)

at scala.Option.getOrElse(Option.scala:189)

at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:323)

at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:222)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)

at py4j.Gateway.invoke(Gateway.java:295)

at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

at py4j.commands.CallCommand.execute(CallCommand.java:79)

at py4j.GatewayConnection.run(GatewayConnection.java:251)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.net.SocketTimeoutException: connect timed out

at java.net.PlainSocketImpl.socketConnect(Native Method)

at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

at java.net.Socket.connect(Socket.java:607)

at com.amazon.redshift.core.RedshiftStream.<init>(RedshiftStream.java:86)

at com.amazon.redshift.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:111)

at com.amazon.redshift.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:224)

at com.amazon.redshift.core.ConnectionFactory.openConnection(ConnectionFactory.java:51)

at com.amazon.redshift.jdbc.RedshiftConnectionImpl.<init>(RedshiftConnectionImpl.java:322)

at com.amazon.redshift.Driver.makeConnection(Driver.java:502)

at com.amazon.redshift.Driver.connect(Driver.java:315)

BradSheridan
Valued Contributor

maybe @Prabakar Ammeappinโ€‹ or @Debayan Mukherjeeโ€‹ have some insight here??

@kennyg To test connectivity to redshift, please use the nc command. From the exception it looks like the spark nodes are unable to establish connectivity to redshift on port 5439

%sh nc -v <redshit host> 5439

kennyg
New Contributor III

@Rohit Rajendranโ€‹ I received this message: nc: connect to <redshift hostname> port 5439 (tcp) failed: Connection timed out.

Please advise

Please make sure the connection from Databricks VPC to the redshift works. You can start by looking at the security groups > inbound/outbound rules for DB default SG and the redshift SG.

kennyg
New Contributor III

@Rohit Rajendranโ€‹ I have confirmed that the Redshift SG has the inbound rule with the source as the security group ID of the Unmanaged Databricks (step 8 here: https://docs.databricks.com/administration-guide/cloud-configurations/aws/vpc-peering.html?_ga=2.108...)

Please advise

Prabakar
Esteemed Contributor III
Esteemed Contributor III

Hi @K Gโ€‹ to eliminate Databricks from the scope, please try creating a VM in the same subnet where Databricks is deployed and check the connectivity. If it's working fine, then we can see what is wrong on Databricks' side. If the connectivity test fails from a VM on AWS, then it's out of Databricks scope, and it is advised you get the help of your networking team to sort this connection issue.

kennyg
New Contributor III

@Prabakar Ammeappinโ€‹ Is there VM type you recommend I spun up an Amazon Linux VM and was able to retrieve the private IP with %sh host -t a <VM hostname>  command.

I cannot use the %sh nc -v <hostname> <port number> command because it needs a port number.

Please advise

Prabakar
Esteemed Contributor III
Esteemed Contributor III

Hi @K Gโ€‹ if you are testing the connection for redshift, then the port number is already provided by @Rohit Rajendranโ€‹ .

nc -v <redshit host> 5439

Spin up a Ubuntu VM in the same subnet where Databricks is deployed and run this command in the VM.

kennyg
New Contributor III

@Prabakar Ammeappinโ€‹  Worked with the networking team and was able to use nc -v <redshit host> 5439 and connect thank you! I am able to read data from a table, read data from a query. However when I run:

df.write \

 .format("com.databricks.spark.redshift") \

 .option("url", <jdbc redshift hostname>") \

 .option("user", "<redshift username>") \

 .option("password", "<redshift password>") \

 .option("dbtable", "<dbschema.dbtablename>") \

 .option("tempdir", "s3a://<name of>/<bucketpath>") \

 .option("forward_spark_s3_credentials", "true") \

 .mode("overwrite") \

 .save()

I get this error:

Py4JJavaError: An error occurred while calling o1357.save.

: java.sql.SQLException: Exception thrown in awaitResult:

at com.databricks.spark.redshift.JDBCWrapper.executeInterruptibly(RedshiftJDBCWrapper.scala:223)

at com.databricks.spark.redshift.JDBCWrapper.executeInterruptibly(RedshiftJDBCWrapper.scala:197)

at com.databricks.spark.redshift.RedshiftWriter.$anonfun$doRedshiftLoad$4(RedshiftWriter.scala:169)

at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)

at com.databricks.backend.daemon.driver.ProgressReporter$.withStatusCode(ProgressReporter.scala:377)

at com.databricks.backend.daemon.driver.ProgressReporter$.withStatusCode(ProgressReporter.scala:363)

at com.databricks.spark.util.SparkDatabricksProgressReporter$.withStatusCode(ProgressReporter.scala:34)

at com.databricks.spark.redshift.RedshiftWriter.$anonfun$doRedshiftLoad$3(RedshiftWriter.scala:169)

at com.databricks.spark.redshift.RedshiftWriter.$anonfun$doRedshiftLoad$3$adapted(RedshiftWriter.scala:155)

at scala.Option.foreach(Option.scala:407)

at com.databricks.spark.redshift.RedshiftWriter.doRedshiftLoad(RedshiftWriter.scala:155)

at com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:448)

at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:115)

at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)

at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:80)

at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:78)

at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:89)

at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:160)

at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:239)

at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:386)

at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:186)

at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)

at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:141)

at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:336)

at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:160)

at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:156)

at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:575)

at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:167)

at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:575)

at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)

at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)

at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)

at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)

at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)

at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:551)

at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:156)

at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:324)

at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:156)

at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:141)

at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:132)

at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:186)

at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:959)

at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:427)

at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:396)

at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:258)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)

at py4j.Gateway.invoke(Gateway.java:295)

at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

at py4j.commands.CallCommand.execute(CallCommand.java:79)

at py4j.GatewayConnection.run(GatewayConnection.java:251)

at java.lang.Thread.run(Thread.java:748)

Caused by: com.amazon.redshift.util.RedshiftException: ERROR: Load into table 'ref_cip_detailed_series_testkg' failed. Check 'stl_load_errors' system table for details.

at com.amazon.redshift.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2607)

at com.amazon.redshift.core.v3.QueryExecutorImpl.processResultsOnThread(QueryExecutorImpl.java:2275)

at com.amazon.redshift.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1880)

at com.amazon.redshift.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1872)

at com.amazon.redshift.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:368)

at com.amazon.redshift.jdbc.RedshiftStatementImpl.executeInternal(RedshiftStatementImpl.java:514)

at com.amazon.redshift.jdbc.RedshiftStatementImpl.execute(RedshiftStatementImpl.java:435)

at com.amazon.redshift.jdbc.RedshiftPreparedStatement.executeWithFlags(RedshiftPreparedStatement.java:200)

at com.amazon.redshift.jdbc.RedshiftPreparedStatement.execute(RedshiftPreparedStatement.java:184)

at com.databricks.spark.redshift.JDBCWrapper.$anonfun$executeInterruptibly$1(RedshiftJDBCWrapper.scala:197)

at com.databricks.spark.redshift.JDBCWrapper.$anonfun$executeInterruptibly$1$adapted(RedshiftJDBCWrapper.scala:197)

at com.databricks.spark.redshift.JDBCWrapper.$anonfun$executeInterruptibly$2(RedshiftJDBCWrapper.scala:215)

at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)

at scala.util.Success.$anonfun$map$1(Try.scala:255)

at scala.util.Success.map(Try.scala:213)

at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)

at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)

at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)

at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

... 1 more

Please advise. Thanks

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.