JDBC connection timeout on workflow cluster
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-09-2023 06:57 AM
Hi there,
I've a batch process configured in a workflow which fails due to a jdbc timeout on a Postgres DB.
I checked the JDBC connection configuration and it seems to work when I query a table and doing a df.show() in the process and it displays the fetched data. So it seems the issue is not coming from there.
I try a couple of configuration at cluster level, but still the same issue.
The conf I tried:
spark.master local[*, 4]
spark.databricks.cluster.profile singleNode
spark.executor.heartbeatInterval 3600s
spark.network.timeout 4000s
Knowing that , in the same process, there is another connection on a mysql DB which seems to work with no noticeable issue.
The DB is hosted on a GCP cloud sql and our Databricks platform is on GCP as well.
Tell me if you have some hint at configuration level in databricks, knowing as well that this process runs at this moment on another VM with a local pyspark.
Herafter the stacktrace I get :
Py4JJavaError: An error occurred while calling o1829.checkpoint.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 33 in stage 58.0 failed 4 times, most recent failure: Lost task 33.3 in stage 58.0 (TID 673) (driver-656749566d-lxcst executor driver): org.postgresql.util.PSQLException: The connection attempt failed.
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:331)
at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:223)
at org.postgresql.Driver.makeConnection(Driver.java:400)
at org.postgresql.Driver.connect(Driver.java:259)
at org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider.getConnection(BasicConnectionProvider.scala:49)
at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProviderBase.create(ConnectionProvider.scala:102)
at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1(JdbcDialects.scala:123)
at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1$adapted(JdbcDialects.scala:119)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:277)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:372)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:336)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:372)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:336)
at o
- Labels:
Batch Process
Query Table
Workflow Cluster
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-09-2023 07:08 AM
Hi @Fred Foucart :
Is it possible to share the code snippet here for better understanding.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-09-2023 07:16 AM
Hi @Rama Krishna N ,
I use a class with its method get_table_from_db as follow:
class ExtractData:
def __init__(self, db: Enum, **kwargs):
self.db_name = db.DB_NAME.value
self.db = db
self.kwargs = kwargs
self.spark = init_spark_session(kwargs["app_name"], **kwargs)
self.loaded_tables = {}
def get_table_from_db(self, table_name):
spark = init_spark_session(self.kwargs["app_name"], **self.kwargs)
return spark.read.format("jdbc").options(**self.kwargs[f"{self.db_name}_options"]) \
.option("dbtable", table_name).load()
the kwargs contains the connections credentials
the init_spark_session is as follow
def init_spark_session(spark_app_name, **kwargs):
SQL Spark instantiation
:param spark_app_name:
:param kwargs:
called_method = SparkSession.builder
conf = SparkConf()
for key, value in kwargs["spark_config"].items():
conf.set(key.replace('"', ''), value)
return called_method.appName(spark_app_name).config(conf=conf).enableHiveSupport().getOrCreate()
the connection arguments:
mysql_options: {
host = ${VSA_HOST}
database = ${VSA_DATABASE}
url = "jdbc:mysql://"${VSA_options.host}"/"${VSA_options.database}"?zeroDateTimeBehavior=convertToNull"
driver = "com.mysql.jdbc.Driver"
user = ${VSA_USER}
password = ${VSA_PASSWORD}
post_options: {
host = ${HOST}
port = ${PORT}
database = ${DATABASE}
url = "jdbc:postgresql://"${CDB_options.host}":"${CDB_options.port}"/"${CDB_options.database}
driver = "org.postgresql.Driver"
user = ${USER}
password = ${PASSWORD}
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-09-2023 11:51 AM
HI @Fred Foucart ,
The above code looks good to me. Can you try with below code as well.
.format("jdbc") \
.option("url", f"jdbc:postgresql://{host}/{database}") \
.option("driver", "org.postgresql.Driver") \
.option("user", username) \
.option("password", password) \
.option("dbtable", <TableName>) \
.option("fetchsize", 5000) \
In case the table is huge, you can try with parallel reads.
table_name = "<Your Table Name>"
partitionColumn = "<Primary Key Numeric Column>"
lowerBound = 1
upperBound = 10000 <Total Row Count> #Our table contains over a Billion rows!!!
fetchsize = 1000
num_partitions = 20 # Do some math on how many slices the data should be partitioned
and read (Total Records / Partitions) i.e. 10000/20 = 500 rows per thread
source_df = spark.read\
.format("jdbc") \
.option("url", f"jdbc:postgresql://{host}/{database}") \
.option("driver", "org.postgresql.Driver") \
.option("user", username) \
.option("password", password) \
.option("dbtable", source_table) \
.option("partitionColumn", partitionColumn) \
.option("lowerBound", lowerBound) \
.option("upperBound", upperBound) \
.option("numPartitions", partitions) \
.option("fetchsize", fetchsize) \
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-11-2023 11:27 PM
I tried all this configuration but still the same issue.
We need to check the configuration at clod sql level.
I keep you update on the solution we'll found.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-24-2023 12:38 AM
Hi all,
I finally apply the following workaround, adding a persist on the dataframe when fetching the data from DB:
The data volume being small, it works.
def get_table_from_db(self, table_name) -> pyspark.sql.DataFrame:
df = self.spark.read.format("jdbc").option("badRecordsPath", "/tmp/badRecordsPath").options(
**self.kwargs[f"{self.db_name}_options"]) \
.option("dbtable", table_name).load().persist(StorageLevel.DISK_ONLY)
return df