07-01-2016 10:23 AM
Hi there,
I'm just getting started with Spark and I've got a moderately sized DataFrame created from collating CSVs in S3 (88 columns, 860k rows) that seems to be taking an unreasonable amount of time to insert (using SaveMode.Append) into Postgres. In a naive implementation, inserting this DataFrame took on the order of 5 hours to complete with the following environment:
Code:
import org.myorg.model.MyType
import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._
import scala.util.{ Success, Try }
val s3Path = s"s3a://path/to/files/*.csv"
// Read set of CSVs to single unified DataFrame val csvFrame: DataFrame = sqlContext.read.format("com.databricks.spark.csv") .option("header", "true") .load(s3Path)
// Process/clean up data in DataFrame to an RDD[MyType] with some row processing logic val processFn: Row => Try[MyType] = MyType.fromRow _ val tempRdd: RDD[MyType] = csvFrame.map(processFn) .collect { case Success(row) => row }
// Splice frame with filler metadata columns val df = sqlContext.createDataFrame(tempRdd) .withColumn("hash", lit("my_hash")) .withColumn("created_at", current_timestamp)
val jdbcUrl = s"jdbc:postgresql://my-rds-host.rds.amazonaws.com:5432/my_db?user=my_user&password=my_password&stringtype=unspecified"
val connectionProperties = new java.util.Properties()
df.write .mode(SaveMode.Append) .jdbc( url = jdbcUrl, table = "my_schema.my_table", connectionProperties = connectionProperties)
Testing this insertion against limited sample sizes of the DataFrame (using the .limit(n) combinator) yielded these unscientific/not-thorough numbers, where larger sample sizes seemed to have wildly differing (but long) execution times:
Observing the RDS instance monitoring shows it's barely getting above an average 3% CPU utilization and an average of 13 write ops/sec.
Conversely, writing the whole DataFrame back to CSV in DBFS with spark-csv or to Spark tables with saveAsTable yielded more palatable times: ~40sec.
I haven't yet tried to optimize the target RDS setup yet (it's freshly provisioned), but it seems like a situation like this should mostly work out the box. My other alternate approach may be to output to CSV and use Postgres' COPY but it'd be really nice for this to 'just work'.
Apologies if this is an overshare on information - I'm really puzzled why this might be behaving so strangely with an implementation mostly adapted from documentation. Am I missing something obvious or something terrifyingly nonobvious like network constraints?
07-11-2016 09:59 AM
In case anyone was curious how I worked around this, I ended up dropping down to Postgres JDBC and using CopyManager to COPY rows in directly from Spark:
https://gist.github.com/longcao/bb61f1798ccbbfa4a0d7b76e49982f84
07-01-2016 10:51 AM
I'm not sure how to get it to nicely format the code into monospaced, syntax highlighted blocks. The </> code button doesn't seem to work. If anyone knows let me know!
07-11-2016 09:59 AM
In case anyone was curious how I worked around this, I ended up dropping down to Postgres JDBC and using CopyManager to COPY rows in directly from Spark:
https://gist.github.com/longcao/bb61f1798ccbbfa4a0d7b76e49982f84
07-28-2016 06:14 AM
Hi
Tried your solution, but I do get: 'syntax error at or near WITH,' also I had to write like that:
val bytes: Iterator[Byte] = rows.map { row =>
(row.mkString(delimiter) + "\n").getBytes.toIterator }.flattenCould u please share how to make it work?
07-28-2016 01:26 PM
Hmm, not entirely sure why the snippet might not work for your setup. Is it possible that you are running on a PostgreSQL version that's not compatible with the COPY command in the snippet? I was running PostgreSQL 9.5.x on RDS.
As for the flatten, I think there is an implicit conversion (https://github.com/scala/scala/blob/v2.10.5/src/library/scala/collection/TraversableOnce.scala#L389 that allowed me to flatten the Iterator[Array[Byte]] to an Iterator[Byte]. An IDE may complain about that or you may have that disabled somehow but it was working in a notebook on Spark 1.6.1 for me.
07-29-2016 01:17 AM
Thanks for response. I'm running my job with driver:
postgresql:9.4-1201-jdbc41
I have an AWS Redshift service with PostgreSQL behind. We'll be migrating from Redshift soon, that's why I'm using direct JDBC connection to write to Postgres. Maybe Redshift service is causing a problem somehow. Anyway thanks, I'll keep investigating.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group