01-09-2023 06:57 AM
Hi there,
I've a batch process configured in a workflow which fails due to a jdbc timeout on a Postgres DB.
I checked the JDBC connection configuration and it seems to work when I query a table and doing a df.show() in the process and it displays the fetched data. So it seems the issue is not coming from there.
I try a couple of configuration at cluster level, but still the same issue.
The conf I tried:
spark.master local[*, 4]
spark.databricks.cluster.profile singleNode
spark.executor.heartbeatInterval 3600s
spark.network.timeout 4000s
Knowing that , in the same process, there is another connection on a mysql DB which seems to work with no noticeable issue.
The DB is hosted on a GCP cloud sql and our Databricks platform is on GCP as well.
Tell me if you have some hint at configuration level in databricks, knowing as well that this process runs at this moment on another VM with a local pyspark.
Herafter the stacktrace I get :
Py4JJavaError: An error occurred while calling o1829.checkpoint.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 33 in stage 58.0 failed 4 times, most recent failure: Lost task 33.3 in stage 58.0 (TID 673) (driver-656749566d-lxcst executor driver): org.postgresql.util.PSQLException: The connection attempt failed.
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:331)
at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:223)
at org.postgresql.Driver.makeConnection(Driver.java:400)
at org.postgresql.Driver.connect(Driver.java:259)
at org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider.getConnection(BasicConnectionProvider.scala:49)
at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProviderBase.create(ConnectionProvider.scala:102)
at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1(JdbcDialects.scala:123)
at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1$adapted(JdbcDialects.scala:119)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:277)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:372)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:336)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:372)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:336)
at o
01-09-2023 07:08 AM
Hi @Fred Foucart :
Is it possible to share the code snippet here for better understanding.
Thanks,
RK
01-09-2023 07:16 AM
Hi @Rama Krishna N ,
I use a class with its method get_table_from_db as follow:
class ExtractData:
def __init__(self, db: Enum, **kwargs):
self.db_name = db.DB_NAME.value
self.db = db
self.kwargs = kwargs
self.spark = init_spark_session(kwargs["app_name"], **kwargs)
self.loaded_tables = {}
def get_table_from_db(self, table_name):
spark = init_spark_session(self.kwargs["app_name"], **self.kwargs)
return spark.read.format("jdbc").options(**self.kwargs[f"{self.db_name}_options"]) \
.option("dbtable", table_name).load()
the kwargs contains the connections credentials
the init_spark_session is as follow
def init_spark_session(spark_app_name, **kwargs):
"""
SQL Spark instantiation
:param spark_app_name:
:param kwargs:
:return:
"""
called_method = SparkSession.builder
conf = SparkConf()
for key, value in kwargs["spark_config"].items():
conf.set(key.replace('"', ''), value)
return called_method.appName(spark_app_name).config(conf=conf).enableHiveSupport().getOrCreate()
the connection arguments:
mysql_options: {
host = ${VSA_HOST}
database = ${VSA_DATABASE}
url = "jdbc:mysql://"${VSA_options.host}"/"${VSA_options.database}"?zeroDateTimeBehavior=convertToNull"
driver = "com.mysql.jdbc.Driver"
user = ${VSA_USER}
password = ${VSA_PASSWORD}
}
post_options: {
host = ${HOST}
port = ${PORT}
database = ${DATABASE}
url = "jdbc:postgresql://"${CDB_options.host}":"${CDB_options.port}"/"${CDB_options.database}
driver = "org.postgresql.Driver"
user = ${USER}
password = ${PASSWORD}
}
01-09-2023 11:51 AM
HI @Fred Foucart ,
The above code looks good to me. Can you try with below code as well.
spark.read\
.format("jdbc") \
.option("url", f"jdbc:postgresql://{host}/{database}") \
.option("driver", "org.postgresql.Driver") \
.option("user", username) \
.option("password", password) \
.option("dbtable", <TableName>) \
.option("fetchsize", 5000) \
.load()
In case the table is huge, you can try with parallel reads.
#Param
table_name = "<Your Table Name>"
partitionColumn = "<Primary Key Numeric Column>"
lowerBound = 1
upperBound = 10000 <Total Row Count> #Our table contains over a Billion rows!!!
fetchsize = 1000
num_partitions = 20 # Do some math on how many slices the data should be partitioned
and read (Total Records / Partitions) i.e. 10000/20 = 500 rows per thread
#Read
source_df = spark.read\
.format("jdbc") \
.option("url", f"jdbc:postgresql://{host}/{database}") \
.option("driver", "org.postgresql.Driver") \
.option("user", username) \
.option("password", password) \
.option("dbtable", source_table) \
.option("partitionColumn", partitionColumn) \
.option("lowerBound", lowerBound) \
.option("upperBound", upperBound) \
.option("numPartitions", partitions) \
.option("fetchsize", fetchsize) \
.load()
01-11-2023 11:27 PM
Hi,
I tried all this configuration but still the same issue.
We need to check the configuration at clod sql level.
I keep you update on the solution we'll found.
Thks
01-24-2023 12:38 AM
Hi all,
I finally apply the following workaround, adding a persist on the dataframe when fetching the data from DB:
The data volume being small, it works.
def get_table_from_db(self, table_name) -> pyspark.sql.DataFrame:
df = self.spark.read.format("jdbc").option("badRecordsPath", "/tmp/badRecordsPath").options(
**self.kwargs[f"{self.db_name}_options"]) \
.option("dbtable", table_name).load().persist(StorageLevel.DISK_ONLY)
return df
01-24-2023 01:22 AM
Hi @Fred Foucart (Customer) , It would mean a lot if you could select the "Best Answer" to help others find the correct answer faster.
This makes that answer appear right after the question, so it's easier to find within a thread.
It also helps us mark the question as answered so we can have more eyes helping others with unanswered questions.
01-11-2023 07:18 AM
Hi @Fred Foucart, We haven’t heard from you since the last response from @Rama Krishna N , and I was checking back to see if his suggestions helped you.
Or else, If you have any solution, please share it with the community, as it can be helpful to others.
Also, Please don't forget to click on the "Select As Best" button whenever the information provided helps resolve your question.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group