- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-01-2016 10:23 AM
Hi there,
I'm just getting started with Spark and I've got a moderately sized DataFrame created from collating CSVs in S3 (88 columns, 860k rows) that seems to be taking an unreasonable amount of time to insert (using SaveMode.Append) into Postgres. In a naive implementation, inserting this DataFrame took on the order of 5 hours to complete with the following environment:
- Source data is a set of 5 CSV files stored in S3 totalling ~250MB of raw data.
- 3 node (1 driver, 2 worker) Spark 1.6.1, Hadoop 2 cluster running on Databricks Starter (running on AWS Account/VPC A)
- Target Postgres RDS instance is a 500GB db.m4.2xlarge instance (running on AWS Account/VPC B)
-
- Simple schema with no keys.
- As noted, the RDS instance and Databricks Spark clusters are running on separate accounts and VPCs but are VPC peered and are able to reach each other.
Code:
import org.myorg.model.MyType
import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._
import scala.util.{ Success, Try }
val s3Path = s"s3a://path/to/files/*.csv"
// Read set of CSVs to single unified DataFrame val csvFrame: DataFrame = sqlContext.read.format("com.databricks.spark.csv") .option("header", "true") .load(s3Path)
// Process/clean up data in DataFrame to an RDD[MyType] with some row processing logic val processFn: Row => Try[MyType] = MyType.fromRow _ val tempRdd: RDD[MyType] = csvFrame.map(processFn) .collect { case Success(row) => row }
// Splice frame with filler metadata columns val df = sqlContext.createDataFrame(tempRdd) .withColumn("hash", lit("my_hash")) .withColumn("created_at", current_timestamp)
val jdbcUrl = s"jdbc:postgresql://my-rds-host.rds.amazonaws.com:5432/my_db?user=my_user&password=my_password&stringtype=unspecified"
val connectionProperties = new java.util.Properties()
df.write .mode(SaveMode.Append) .jdbc( url = jdbcUrl, table = "my_schema.my_table", connectionProperties = connectionProperties)
Testing this insertion against limited sample sizes of the DataFrame (using the .limit(n) combinator) yielded these unscientific/not-thorough numbers, where larger sample sizes seemed to have wildly differing (but long) execution times:
- limit(100) = ~2.5sec
- limit(1000) = ~3.1sec
- limit(5000) = ~6.8sec
- limit(7500) = ~110sec !
- limit(10000) = ~412sec, ~813sec !
Observing the RDS instance monitoring shows it's barely getting above an average 3% CPU utilization and an average of 13 write ops/sec.
Conversely, writing the whole DataFrame back to CSV in DBFS with spark-csv or to Spark tables with saveAsTable yielded more palatable times: ~40sec.
I haven't yet tried to optimize the target RDS setup yet (it's freshly provisioned), but it seems like a situation like this should mostly work out the box. My other alternate approach may be to output to CSV and use Postgres' COPY but it'd be really nice for this to 'just work'.
Apologies if this is an overshare on information - I'm really puzzled why this might be behaving so strangely with an implementation mostly adapted from documentation. Am I missing something obvious or something terrifyingly nonobvious like network constraints?
- Labels:
-
Dataframe
-
Postgres
-
Postgresql
-
Rds