cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Writing DataFrame to PostgreSQL via JDBC extremely slow (Spark 1.6.1)

longcao
New Contributor III

Hi there,

I'm just getting started with Spark and I've got a moderately sized DataFrame created from collating CSVs in S3 (88 columns, 860k rows) that seems to be taking an unreasonable amount of time to insert (using SaveMode.Append) into Postgres. In a naive implementation, inserting this DataFrame took on the order of 5 hours to complete with the following environment:

  • Source data is a set of 5 CSV files stored in S3 totalling ~250MB of raw data.
  • 3 node (1 driver, 2 worker) Spark 1.6.1, Hadoop 2 cluster running on Databricks Starter (running on AWS Account/VPC A)
  • Target Postgres RDS instance is a 500GB db.m4.2xlarge instance (running on AWS Account/VPC B)
  •  
    • Simple schema with no keys.
  • As noted, the RDS instance and Databricks Spark clusters are running on separate accounts and VPCs but are VPC peered and are able to reach each other.

Code:

import org.myorg.model.MyType

import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._

import scala.util.{ Success, Try }

val s3Path = s"s3a://path/to/files/*.csv"

// Read set of CSVs to single unified DataFrame val csvFrame: DataFrame = sqlContext.read.format("com.databricks.spark.csv") .option("header", "true") .load(s3Path)

// Process/clean up data in DataFrame to an RDD[MyType] with some row processing logic val processFn: Row => Try[MyType] = MyType.fromRow _ val tempRdd: RDD[MyType] = csvFrame.map(processFn) .collect { case Success(row) => row }

// Splice frame with filler metadata columns val df = sqlContext.createDataFrame(tempRdd) .withColumn("hash", lit("my_hash")) .withColumn("created_at", current_timestamp)

val jdbcUrl = s"jdbc:postgresql://my-rds-host.rds.amazonaws.com:5432/my_db?user=my_user&password=my_password&stringtype=unspecified"

val connectionProperties = new java.util.Properties()

df.write .mode(SaveMode.Append) .jdbc( url = jdbcUrl, table = "my_schema.my_table", connectionProperties = connectionProperties)

Testing this insertion against limited sample sizes of the DataFrame (using the .limit(n) combinator) yielded these unscientific/not-thorough numbers, where larger sample sizes seemed to have wildly differing (but long) execution times:

  • limit(100) = ~2.5sec
  • limit(1000) = ~3.1sec
  • limit(5000) = ~6.8sec
  • limit(7500) = ~110sec !
  • limit(10000) = ~412sec, ~813sec !

Observing the RDS instance monitoring shows it's barely getting above an average 3% CPU utilization and an average of 13 write ops/sec.

Conversely, writing the whole DataFrame back to CSV in DBFS with spark-csv or to Spark tables with saveAsTable yielded more palatable times: ~40sec.

I haven't yet tried to optimize the target RDS setup yet (it's freshly provisioned), but it seems like a situation like this should mostly work out the box. My other alternate approach may be to output to CSV and use Postgres' COPY but it'd be really nice for this to 'just work'.

Apologies if this is an overshare on information - I'm really puzzled why this might be behaving so strangely with an implementation mostly adapted from documentation. Am I missing something obvious or something terrifyingly nonobvious like network constraints?

1 ACCEPTED SOLUTION

Accepted Solutions

longcao
New Contributor III

In case anyone was curious how I worked around this, I ended up dropping down to Postgres JDBC and using CopyManager to COPY rows in directly from Spark:

https://gist.github.com/longcao/bb61f1798ccbbfa4a0d7b76e49982f84

View solution in original post

5 REPLIES 5

longcao
New Contributor III

I'm not sure how to get it to nicely format the code into monospaced, syntax highlighted blocks. The </> code button doesn't seem to work. If anyone knows let me know!

longcao
New Contributor III

In case anyone was curious how I worked around this, I ended up dropping down to Postgres JDBC and using CopyManager to COPY rows in directly from Spark:

https://gist.github.com/longcao/bb61f1798ccbbfa4a0d7b76e49982f84

ALincoln
New Contributor II

Hi

Tried your solution, but I do get: 'syntax error at or near WITH,' also I had to write like that:

val bytes: Iterator[Byte] = rows.map { row =>

(row.mkString(delimiter) + "\n").getBytes.toIterator

}.flatten

Could u please share how to make it work?

longcao
New Contributor III

Hmm, not entirely sure why the snippet might not work for your setup. Is it possible that you are running on a PostgreSQL version that's not compatible with the COPY command in the snippet? I was running PostgreSQL 9.5.x on RDS.

As for the flatten, I think there is an implicit conversion (https://github.com/scala/scala/blob/v2.10.5/src/library/scala/collection/TraversableOnce.scala#L389 that allowed me to flatten the Iterator[Array[Byte]] to an Iterator[Byte]. An IDE may complain about that or you may have that disabled somehow but it was working in a notebook on Spark 1.6.1 for me.

ALincoln
New Contributor II

Thanks for response. I'm running my job with driver:

postgresql:9.4-1201-jdbc41

I have an AWS Redshift service with PostgreSQL behind. We'll be migrating from Redshift soon, that's why I'm using direct JDBC connection to write to Postgres. Maybe Redshift service is causing a problem somehow. Anyway thanks, I'll keep investigating.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group