cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

JDBC connection timeout on workflow cluster

Fred_F
New Contributor III

Hi there,

I've a batch process configured in a workflow which fails due to a jdbc timeout on a Postgres DB.

I checked the JDBC connection configuration and it seems to work when I query a table and doing a df.show() in the process and it displays the fetched data. So it seems the issue is not coming from there.

I try a couple of configuration at cluster level, but still the same issue.

The conf I tried:

spark.master local[*, 4]
 
spark.databricks.cluster.profile singleNode
 
spark.executor.heartbeatInterval 3600s
 
spark.network.timeout 4000s

Knowing that , in the same process, there is another connection on a mysql DB which seems to work with no noticeable issue.

The DB is hosted on a GCP cloud sql and our Databricks platform is on GCP as well.

Tell me if you have some hint at configuration level in databricks, knowing as well that this process runs at this moment on another VM with a local pyspark.

Herafter the stacktrace I get :

Py4JJavaError: An error occurred while calling o1829.checkpoint.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 33 in stage 58.0 failed 4 times, most recent failure: Lost task 33.3 in stage 58.0 (TID 673) (driver-656749566d-lxcst executor driver): org.postgresql.util.PSQLException: The connection attempt failed.
	at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:331)
	at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
	at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:223)
	at org.postgresql.Driver.makeConnection(Driver.java:400)
	at org.postgresql.Driver.connect(Driver.java:259)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider.getConnection(BasicConnectionProvider.scala:49)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProviderBase.create(ConnectionProvider.scala:102)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1(JdbcDialects.scala:123)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1$adapted(JdbcDialects.scala:119)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:277)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:372)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:336)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:372)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:336)
	at o

7 REPLIES 7

RKNutalapati
Valued Contributor

Hi @Fred Foucart​ :

Is it possible to share the code snippet here for better understanding.

Thanks,

RK

Fred_F
New Contributor III

Hi @Rama Krishna N​ ,

I use a class with its method get_table_from_db as follow:

class ExtractData:
 
    def __init__(self, db: Enum, **kwargs):
        self.db_name = db.DB_NAME.value
        self.db = db
        self.kwargs = kwargs
        self.spark = init_spark_session(kwargs["app_name"], **kwargs)
        self.loaded_tables = {}
 
    def get_table_from_db(self, table_name):
        spark = init_spark_session(self.kwargs["app_name"], **self.kwargs)
        return spark.read.format("jdbc").options(**self.kwargs[f"{self.db_name}_options"]) \
            .option("dbtable", table_name).load()

the kwargs contains the connections credentials

the init_spark_session is as follow

def init_spark_session(spark_app_name, **kwargs):
    """
    SQL Spark instantiation
    :param spark_app_name:
    :param kwargs:
    :return:
    """
 
    called_method = SparkSession.builder
    conf = SparkConf()
    for key, value in kwargs["spark_config"].items():
        conf.set(key.replace('"', ''), value)
    return called_method.appName(spark_app_name).config(conf=conf).enableHiveSupport().getOrCreate()

 the connection arguments:

mysql_options: {
  host = ${VSA_HOST}
  database = ${VSA_DATABASE}
  url = "jdbc:mysql://"${VSA_options.host}"/"${VSA_options.database}"?zeroDateTimeBehavior=convertToNull"
  driver = "com.mysql.jdbc.Driver"
  user = ${VSA_USER}
  password = ${VSA_PASSWORD}
}
post_options: {
  host = ${HOST}
  port = ${PORT}
  database = ${DATABASE}
  url = "jdbc:postgresql://"${CDB_options.host}":"${CDB_options.port}"/"${CDB_options.database}
  driver = "org.postgresql.Driver"
  user = ${USER}
  password = ${PASSWORD}
}

RKNutalapati
Valued Contributor

HI @Fred Foucart​ ,

The above code looks good to me. Can you try with below code as well.

spark.read\

  .format("jdbc") \

  .option("url", f"jdbc:postgresql://{host}/{database}") \

  .option("driver", "org.postgresql.Driver") \

  .option("user", username) \

  .option("password", password) \

  .option("dbtable", <TableName>) \

  .option("fetchsize", 5000) \

  .load()

In case the table is huge, you can try with parallel reads.

#Param 

table_name = "<Your Table Name>"

partitionColumn = "<Primary Key Numeric Column>"

lowerBound = 1

upperBound = 10000 <Total Row Count> #Our table contains over a Billion rows!!!

fetchsize = 1000

num_partitions = 20 # Do some math on how many slices the data should be partitioned 

          and read (Total Records / Partitions) i.e. 10000/20 = 500 rows per thread

#Read 

source_df = spark.read\

  .format("jdbc") \

  .option("url", f"jdbc:postgresql://{host}/{database}") \

  .option("driver", "org.postgresql.Driver") \

  .option("user", username) \

  .option("password", password) \

  .option("dbtable", source_table) \

  .option("partitionColumn", partitionColumn) \

  .option("lowerBound", lowerBound) \

  .option("upperBound", upperBound) \

  .option("numPartitions", partitions) \

  .option("fetchsize", fetchsize) \

  .load()

Fred_F
New Contributor III

Hi,

I tried all this configuration but still the same issue.

We need to check the configuration at clod sql level.

I keep you update on the solution we'll found.

Thks

Fred_F
New Contributor III

Hi all,

I finally apply the following workaround, adding a persist on the dataframe when fetching the data from DB:

The data volume being small, it works.

    def get_table_from_db(self, table_name) -> pyspark.sql.DataFrame:
        df = self.spark.read.format("jdbc").option("badRecordsPath", "/tmp/badRecordsPath").options(
            **self.kwargs[f"{self.db_name}_options"]) \
            .option("dbtable", table_name).load().persist(StorageLevel.DISK_ONLY)
        return df

Kaniz
Community Manager
Community Manager

Hi @Fred Foucart​  (Customer)​ ​, It would mean a lot if you could select the "Best Answer" to help others find the correct answer faster.

This makes that answer appear right after the question, so it's easier to find within a thread.

It also helps us mark the question as answered so we can have more eyes helping others with unanswered questions.

Kaniz
Community Manager
Community Manager

Hi @Fred Foucart​, We haven’t heard from you since the last response from @Rama Krishna N​ , and I was checking back to see if his suggestions helped you.

Or else, If you have any solution, please share it with the community, as it can be helpful to others.

Also, Please don't forget to click on the "Select As Best" button whenever the information provided helps resolve your question.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.