cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

JDBC connection timeout on workflow cluster

Fred_F
New Contributor III

Hi there,

I've a batch process configured in a workflow which fails due to a jdbc timeout on a Postgres DB.

I checked the JDBC connection configuration and it seems to work when I query a table and doing a df.show() in the process and it displays the fetched data. So it seems the issue is not coming from there.

I try a couple of configuration at cluster level, but still the same issue.

The conf I tried:

spark.master local[*, 4]
 
spark.databricks.cluster.profile singleNode
 
spark.executor.heartbeatInterval 3600s
 
spark.network.timeout 4000s

Knowing that , in the same process, there is another connection on a mysql DB which seems to work with no noticeable issue.

The DB is hosted on a GCP cloud sql and our Databricks platform is on GCP as well.

Tell me if you have some hint at configuration level in databricks, knowing as well that this process runs at this moment on another VM with a local pyspark.

Herafter the stacktrace I get :

Py4JJavaError: An error occurred while calling o1829.checkpoint.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 33 in stage 58.0 failed 4 times, most recent failure: Lost task 33.3 in stage 58.0 (TID 673) (driver-656749566d-lxcst executor driver): org.postgresql.util.PSQLException: The connection attempt failed.
	at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:331)
	at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
	at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:223)
	at org.postgresql.Driver.makeConnection(Driver.java:400)
	at org.postgresql.Driver.connect(Driver.java:259)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider.getConnection(BasicConnectionProvider.scala:49)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProviderBase.create(ConnectionProvider.scala:102)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1(JdbcDialects.scala:123)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1$adapted(JdbcDialects.scala:119)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:277)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:372)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:336)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:372)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:336)
	at o

5 REPLIES 5

RKNutalapati
Valued Contributor

Hi @Fred Foucart​ :

Is it possible to share the code snippet here for better understanding.

Thanks,

RK

Fred_F
New Contributor III

Hi @Rama Krishna N​ ,

I use a class with its method get_table_from_db as follow:

class ExtractData:
 
    def __init__(self, db: Enum, **kwargs):
        self.db_name = db.DB_NAME.value
        self.db = db
        self.kwargs = kwargs
        self.spark = init_spark_session(kwargs["app_name"], **kwargs)
        self.loaded_tables = {}
 
    def get_table_from_db(self, table_name):
        spark = init_spark_session(self.kwargs["app_name"], **self.kwargs)
        return spark.read.format("jdbc").options(**self.kwargs[f"{self.db_name}_options"]) \
            .option("dbtable", table_name).load()

the kwargs contains the connections credentials

the init_spark_session is as follow

def init_spark_session(spark_app_name, **kwargs):
    """
    SQL Spark instantiation
    :param spark_app_name:
    :param kwargs:
    :return:
    """
 
    called_method = SparkSession.builder
    conf = SparkConf()
    for key, value in kwargs["spark_config"].items():
        conf.set(key.replace('"', ''), value)
    return called_method.appName(spark_app_name).config(conf=conf).enableHiveSupport().getOrCreate()

 the connection arguments:

mysql_options: {
  host = ${VSA_HOST}
  database = ${VSA_DATABASE}
  url = "jdbc:mysql://"${VSA_options.host}"/"${VSA_options.database}"?zeroDateTimeBehavior=convertToNull"
  driver = "com.mysql.jdbc.Driver"
  user = ${VSA_USER}
  password = ${VSA_PASSWORD}
}
post_options: {
  host = ${HOST}
  port = ${PORT}
  database = ${DATABASE}
  url = "jdbc:postgresql://"${CDB_options.host}":"${CDB_options.port}"/"${CDB_options.database}
  driver = "org.postgresql.Driver"
  user = ${USER}
  password = ${PASSWORD}
}

RKNutalapati
Valued Contributor

HI @Fred Foucart​ ,

The above code looks good to me. Can you try with below code as well.

spark.read\

  .format("jdbc") \

  .option("url", f"jdbc:postgresql://{host}/{database}") \

  .option("driver", "org.postgresql.Driver") \

  .option("user", username) \

  .option("password", password) \

  .option("dbtable", <TableName>) \

  .option("fetchsize", 5000) \

  .load()

In case the table is huge, you can try with parallel reads.

#Param 

table_name = "<Your Table Name>"

partitionColumn = "<Primary Key Numeric Column>"

lowerBound = 1

upperBound = 10000 <Total Row Count> #Our table contains over a Billion rows!!!

fetchsize = 1000

num_partitions = 20 # Do some math on how many slices the data should be partitioned 

          and read (Total Records / Partitions) i.e. 10000/20 = 500 rows per thread

#Read 

source_df = spark.read\

  .format("jdbc") \

  .option("url", f"jdbc:postgresql://{host}/{database}") \

  .option("driver", "org.postgresql.Driver") \

  .option("user", username) \

  .option("password", password) \

  .option("dbtable", source_table) \

  .option("partitionColumn", partitionColumn) \

  .option("lowerBound", lowerBound) \

  .option("upperBound", upperBound) \

  .option("numPartitions", partitions) \

  .option("fetchsize", fetchsize) \

  .load()

Fred_F
New Contributor III

Hi,

I tried all this configuration but still the same issue.

We need to check the configuration at clod sql level.

I keep you update on the solution we'll found.

Thks

Fred_F
New Contributor III

Hi all,

I finally apply the following workaround, adding a persist on the dataframe when fetching the data from DB:

The data volume being small, it works.

    def get_table_from_db(self, table_name) -> pyspark.sql.DataFrame:
        df = self.spark.read.format("jdbc").option("badRecordsPath", "/tmp/badRecordsPath").options(
            **self.kwargs[f"{self.db_name}_options"]) \
            .option("dbtable", table_name).load().persist(StorageLevel.DISK_ONLY)
        return df

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group