โ08-12-2022 09:37 AM
I am trying to connect my Spark cluster to a Postgresql RDS instance. The Python notebook code that was used is seen below:
df = ( spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://<connection-string>:5432/databaseโ)\
.option("dbtable", โ<schema.table>โ)\
.option("user", "postgres")\
.option("password", โPass*****โ)
.load()
)
The following error message was received
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<command-1164003892694289> in <module>
----> 1 df = ( spark.read \
2 .format("jdbc") \
3 .option("url", "jdbc:postgresql://<connection-string>:5432/database") \
4 .option("dbtable", "<schema.table>") \
5 .option("user", "postgres") \
/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
162 return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
163 else:
--> 164 return self._df(self._jreader.load())
165
166 def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
115 def deco(*a, **kw):
116 try:
--> 117 return f(*a, **kw)
118 except py4j.protocol.Py4JJavaError as e:
119 converted = convert_exception(e.java_exception)
/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o1170.load.
: org.postgresql.util.PSQLException: The connection attempt failed.
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:315)
at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:51)
at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:223)
at org.postgresql.Driver.makeConnection(Driver.java:465)
at org.postgresql.Driver.connect(Driver.java:264)
at org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider.getConnection(BasicConnectionProvider.scala:49)
at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProviderBase.create(ConnectionProvider.scala:102)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$createConnectionFactory$1(JdbcUtils.scala:69)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.getQueryOutputSchema(JDBCRDD.scala:63)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:58)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:241)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:36)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:385)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:356)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:323)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:323)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:222)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: connect timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at org.postgresql.core.PGStream.createSocket(PGStream.java:231)
at org.postgresql.core.PGStream.<init>(PGStream.java:95)
at org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:98)
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:213)
... 28 more
Any help or direction will be greatly appreciated, Thank you.
Please note that I changed the connection URL and other details for security reasons and that's not how they were entered.
โ08-12-2022 09:56 AM
"Caused by: java.net.SocketTimeoutException: connect timed out" indicate the network connection between Databricks cluster and the postgress database on 5432 port was not established and eventually timed out.
As a first step, please ensure the connection between DB cluster and postgres works fine
To test the connectivity, you can run below on a notebook or web terminal
%sh nc -v <postgres host> 5432
โ08-15-2022 07:56 AM
Thank you Rohit for your response. I realized that %sh nc -v <postgres host> 5432 does not return anything (times out), meaning there's no connectivity to Postgres. Can you please direct me to where and how I can establish connectivity to Postgres? Where in DBricks can I allow port 5432? Will it be in the IAM profile? Any advice will help.
โ08-15-2022 12:14 PM
Okay so I resolved this problem by creating a VPC peering between the Databricks VPC and my local VPC that hosts my Postgresql instance. https://docs.databricks.com/administration-guide/cloud-configurations/aws/vpc-peering.html?_ga=2.423....
I have now been able to create a dataframe linking to my Postgresql RDS with the same notebook python code as seen above.
โ08-15-2022 12:22 PM
Glad to know the issue got resolved.
Yes, the connectivity needs to be established from the AWS account VPC where DB clusters are hosted to the postgres.
โ09-01-2022 11:09 AM
Hi @Rohit Rajendranโ and @charles okohโ I was able to set up VPC peering for my customer's Databricks and Redshift account.
However when I attempt to run:
I get the error below:
Py4JJavaError: An error occurred while calling o486.load.
: java.sql.SQLException: The connection attempt failed.
at com.amazon.redshift.util.RedshiftException.getSQLException(RedshiftException.java:56)
at com.amazon.redshift.Driver.connect(Driver.java:339)
at org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:46)
at com.databricks.spark.redshift.JDBCWrapper.getConnector(RedshiftJDBCWrapper.scala:355)
at com.databricks.spark.redshift.JDBCWrapper.getConnector(RedshiftJDBCWrapper.scala:376)
at com.databricks.spark.redshift.RedshiftRelation.$anonfun$schema$1(RedshiftRelation.scala:76)
at scala.Option.getOrElse(Option.scala:189)
at com.databricks.spark.redshift.RedshiftRelation.schema$lzycompute(RedshiftRelation.scala:73)
at com.databricks.spark.redshift.RedshiftRelation.schema(RedshiftRelation.scala:72)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:496)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:356)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:323)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:323)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:222)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: connect timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:607)
at com.amazon.redshift.core.RedshiftStream.<init>(RedshiftStream.java:86)
at com.amazon.redshift.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:111)
at com.amazon.redshift.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:224)
at com.amazon.redshift.core.ConnectionFactory.openConnection(ConnectionFactory.java:51)
at com.amazon.redshift.jdbc.RedshiftConnectionImpl.<init>(RedshiftConnectionImpl.java:322)
at com.amazon.redshift.Driver.makeConnection(Driver.java:502)
at com.amazon.redshift.Driver.connect(Driver.java:315)
โ09-01-2022 11:39 AM
maybe @Prabakar Ammeappinโ or @Debayan Mukherjeeโ have some insight here??
โ09-01-2022 12:05 PM
@kennyg To test connectivity to redshift, please use the nc command. From the exception it looks like the spark nodes are unable to establish connectivity to redshift on port 5439
%sh nc -v <redshit host> 5439
โ09-01-2022 12:12 PM
@Rohit Rajendranโ I received this message: nc: connect to <redshift hostname> port 5439 (tcp) failed: Connection timed out.
Please advise
โ09-01-2022 12:16 PM
Please make sure the connection from Databricks VPC to the redshift works. You can start by looking at the security groups > inbound/outbound rules for DB default SG and the redshift SG.
โ09-01-2022 12:32 PM
@Rohit Rajendranโ I have confirmed that the Redshift SG has the inbound rule with the source as the security group ID of the Unmanaged Databricks (step 8 here: https://docs.databricks.com/administration-guide/cloud-configurations/aws/vpc-peering.html?_ga=2.108...)
Please advise
โ09-02-2022 12:32 AM
Hi @K Gโ to eliminate Databricks from the scope, please try creating a VM in the same subnet where Databricks is deployed and check the connectivity. If it's working fine, then we can see what is wrong on Databricks' side. If the connectivity test fails from a VM on AWS, then it's out of Databricks scope, and it is advised you get the help of your networking team to sort this connection issue.
โ09-02-2022 04:06 AM
@Prabakar Ammeappinโ Is there VM type you recommend I spun up an Amazon Linux VM and was able to retrieve the private IP with %sh host -t a <VM hostname> command.
I cannot use the %sh nc -v <hostname> <port number> command because it needs a port number.
Please advise
โ09-02-2022 04:13 AM
Hi @K Gโ if you are testing the connection for redshift, then the port number is already provided by @Rohit Rajendranโ .
nc -v <redshit host> 5439
Spin up a Ubuntu VM in the same subnet where Databricks is deployed and run this command in the VM.
โ09-02-2022 07:28 AM
@Prabakar Ammeappinโ Worked with the networking team and was able to use nc -v <redshit host> 5439 and connect thank you! I am able to read data from a table, read data from a query. However when I run:
df.write \
.format("com.databricks.spark.redshift") \
.option("url", <jdbc redshift hostname>") \
.option("user", "<redshift username>") \
.option("password", "<redshift password>") \
.option("dbtable", "<dbschema.dbtablename>") \
.option("tempdir", "s3a://<name of>/<bucketpath>") \
.option("forward_spark_s3_credentials", "true") \
.mode("overwrite") \
.save()
I get this error:
Py4JJavaError: An error occurred while calling o1357.save.
: java.sql.SQLException: Exception thrown in awaitResult:
at com.databricks.spark.redshift.JDBCWrapper.executeInterruptibly(RedshiftJDBCWrapper.scala:223)
at com.databricks.spark.redshift.JDBCWrapper.executeInterruptibly(RedshiftJDBCWrapper.scala:197)
at com.databricks.spark.redshift.RedshiftWriter.$anonfun$doRedshiftLoad$4(RedshiftWriter.scala:169)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at com.databricks.backend.daemon.driver.ProgressReporter$.withStatusCode(ProgressReporter.scala:377)
at com.databricks.backend.daemon.driver.ProgressReporter$.withStatusCode(ProgressReporter.scala:363)
at com.databricks.spark.util.SparkDatabricksProgressReporter$.withStatusCode(ProgressReporter.scala:34)
at com.databricks.spark.redshift.RedshiftWriter.$anonfun$doRedshiftLoad$3(RedshiftWriter.scala:169)
at com.databricks.spark.redshift.RedshiftWriter.$anonfun$doRedshiftLoad$3$adapted(RedshiftWriter.scala:155)
at scala.Option.foreach(Option.scala:407)
at com.databricks.spark.redshift.RedshiftWriter.doRedshiftLoad(RedshiftWriter.scala:155)
at com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:448)
at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:115)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:80)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:78)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:89)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:239)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:386)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:186)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:141)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:336)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:160)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:156)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:575)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:167)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:575)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:551)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:156)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:324)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:156)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:141)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:132)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:186)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:959)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:427)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:396)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:258)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazon.redshift.util.RedshiftException: ERROR: Load into table 'ref_cip_detailed_series_testkg' failed. Check 'stl_load_errors' system table for details.
at com.amazon.redshift.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2607)
at com.amazon.redshift.core.v3.QueryExecutorImpl.processResultsOnThread(QueryExecutorImpl.java:2275)
at com.amazon.redshift.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1880)
at com.amazon.redshift.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1872)
at com.amazon.redshift.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:368)
at com.amazon.redshift.jdbc.RedshiftStatementImpl.executeInternal(RedshiftStatementImpl.java:514)
at com.amazon.redshift.jdbc.RedshiftStatementImpl.execute(RedshiftStatementImpl.java:435)
at com.amazon.redshift.jdbc.RedshiftPreparedStatement.executeWithFlags(RedshiftPreparedStatement.java:200)
at com.amazon.redshift.jdbc.RedshiftPreparedStatement.execute(RedshiftPreparedStatement.java:184)
at com.databricks.spark.redshift.JDBCWrapper.$anonfun$executeInterruptibly$1(RedshiftJDBCWrapper.scala:197)
at com.databricks.spark.redshift.JDBCWrapper.$anonfun$executeInterruptibly$1$adapted(RedshiftJDBCWrapper.scala:197)
at com.databricks.spark.redshift.JDBCWrapper.$anonfun$executeInterruptibly$2(RedshiftJDBCWrapper.scala:215)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Please advise. Thanks
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group