<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Writing DataFrame to PostgreSQL via JDBC extremely slow (Spark 1.6.1) in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/writing-dataframe-to-postgresql-via-jdbc-extremely-slow-spark-1/m-p/29595#M21318</link>
    <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt;In case anyone was curious how I worked around this, I ended up dropping down to Postgres JDBC and using CopyManager to COPY rows in directly from Spark:&lt;/P&gt;
&lt;P&gt;&lt;A href="https://gist.github.com/longcao/bb61f1798ccbbfa4a0d7b76e49982f84" target="test_blank"&gt;https://gist.github.com/longcao/bb61f1798ccbbfa4a0d7b76e49982f84&lt;/A&gt;&lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;</description>
    <pubDate>Mon, 11 Jul 2016 16:59:35 GMT</pubDate>
    <dc:creator>longcao</dc:creator>
    <dc:date>2016-07-11T16:59:35Z</dc:date>
    <item>
      <title>Writing DataFrame to PostgreSQL via JDBC extremely slow (Spark 1.6.1)</title>
      <link>https://community.databricks.com/t5/data-engineering/writing-dataframe-to-postgresql-via-jdbc-extremely-slow-spark-1/m-p/29593#M21316</link>
      <description>&lt;P&gt;Hi there,&lt;/P&gt;&lt;P&gt;I'm just getting started with Spark and I've got a moderately sized DataFrame created from collating CSVs in S3 (&lt;B&gt;88 columns, 860k rows&lt;/B&gt;) that seems to be taking an unreasonable amount of time to insert (using SaveMode.Append) into Postgres. In a naive implementation, inserting this DataFrame took on the order of &lt;B&gt;5 hours&lt;/B&gt; to complete with the following environment:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Source data is a set of 5 CSV files stored in S3 totalling &lt;B&gt;~250MB&lt;/B&gt; of raw data.&lt;/LI&gt;&lt;LI&gt;&lt;B&gt;3 node (1 driver, 2 worker)&lt;/B&gt; Spark 1.6.1, Hadoop 2 cluster running on Databricks Starter (running on AWS Account/VPC A)&lt;/LI&gt;&lt;LI&gt;Target Postgres RDS instance is a 500GB &lt;B&gt;db.m4.2xlarge&lt;/B&gt; instance (running on AWS Account/VPC B)&lt;/LI&gt;&lt;LI&gt;&amp;nbsp;&lt;/LI&gt;&lt;LI&gt; &lt;/LI&gt;&lt;LI&gt; &lt;UL&gt;&lt;LI&gt;Simple schema with no keys.&lt;/LI&gt;&lt;/UL&gt;&lt;/LI&gt;&lt;LI&gt;As noted, the RDS instance and Databricks Spark clusters are running on separate accounts and VPCs but are VPC peered and are able to reach each other.&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;Code:&lt;/P&gt;&lt;P&gt;import org.myorg.model.MyType&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;import scala.util.{ Success, Try }&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;val s3Path = s"s3a://path/to/files/*.csv"&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;// Read set of CSVs to single unified DataFrame val csvFrame: DataFrame = sqlContext.read.format("com.databricks.spark.csv") .option("header", "true") .load(s3Path)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;// Process/clean up data in DataFrame to an RDD[MyType] with some row processing logic val processFn: Row =&amp;gt; Try[MyType] = MyType.fromRow _ val tempRdd: RDD[MyType] = csvFrame.map(processFn) .collect { case Success(row) =&amp;gt; row }&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;// Splice frame with filler metadata columns val df = sqlContext.createDataFrame(tempRdd) .withColumn("hash", lit("my_hash")) .withColumn("created_at", current_timestamp)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;val jdbcUrl = s"jdbc:postgresql://my-rds-host.rds.amazonaws.com:5432/my_db?user=my_user&amp;amp;password=my_password&amp;amp;stringtype=unspecified"&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;val connectionProperties = new java.util.Properties()&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;df.write .mode(SaveMode.Append) .jdbc( url = jdbcUrl, table = "my_schema.my_table", connectionProperties = connectionProperties) &lt;/P&gt;&lt;P&gt;Testing this insertion against limited sample sizes of the DataFrame (using the &lt;I&gt;.limit(n)&lt;/I&gt; combinator) yielded these unscientific/not-thorough numbers, where larger sample sizes seemed to have wildly differing (but long) execution times:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;limit(100) = ~2.5sec&lt;/LI&gt;&lt;LI&gt;limit(1000) = ~3.1sec&lt;/LI&gt;&lt;LI&gt;limit(5000) = ~6.8sec&lt;/LI&gt;&lt;LI&gt;limit(7500) = ~110sec !&lt;/LI&gt;&lt;LI&gt;limit(10000) = ~412sec, ~813sec !&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;&lt;B&gt;Observing the RDS instance monitoring shows it's barely getting above an average 3% CPU utilization and an average of 13 write ops/sec.&lt;/B&gt;&lt;/P&gt;&lt;P&gt;Conversely, writing the whole DataFrame back to CSV in DBFS with spark-csv or to Spark tables with saveAsTable yielded more palatable times: ~40sec.&lt;/P&gt;&lt;P&gt;I haven't yet tried to optimize the target RDS setup yet (it's freshly provisioned), but it seems like a situation like this should mostly work out the box. My other alternate approach may be to output to CSV and use Postgres' COPY but it'd be really nice for this to 'just work'.&lt;/P&gt;&lt;P&gt;Apologies if this is an overshare on information - I'm really puzzled why this might be behaving so strangely with an implementation mostly adapted from documentation. Am I missing something obvious or something terrifyingly nonobvious like network constraints?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 01 Jul 2016 17:23:05 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/writing-dataframe-to-postgresql-via-jdbc-extremely-slow-spark-1/m-p/29593#M21316</guid>
      <dc:creator>longcao</dc:creator>
      <dc:date>2016-07-01T17:23:05Z</dc:date>
    </item>
    <item>
      <title>Re: Writing DataFrame to PostgreSQL via JDBC extremely slow (Spark 1.6.1)</title>
      <link>https://community.databricks.com/t5/data-engineering/writing-dataframe-to-postgresql-via-jdbc-extremely-slow-spark-1/m-p/29594#M21317</link>
      <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt;I'm not sure how to get it to nicely format the code into monospaced, syntax highlighted blocks. The &amp;lt;/&amp;gt; code button doesn't seem to work. If anyone knows let me know!&lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 01 Jul 2016 17:51:35 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/writing-dataframe-to-postgresql-via-jdbc-extremely-slow-spark-1/m-p/29594#M21317</guid>
      <dc:creator>longcao</dc:creator>
      <dc:date>2016-07-01T17:51:35Z</dc:date>
    </item>
    <item>
      <title>Re: Writing DataFrame to PostgreSQL via JDBC extremely slow (Spark 1.6.1)</title>
      <link>https://community.databricks.com/t5/data-engineering/writing-dataframe-to-postgresql-via-jdbc-extremely-slow-spark-1/m-p/29595#M21318</link>
      <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt;In case anyone was curious how I worked around this, I ended up dropping down to Postgres JDBC and using CopyManager to COPY rows in directly from Spark:&lt;/P&gt;
&lt;P&gt;&lt;A href="https://gist.github.com/longcao/bb61f1798ccbbfa4a0d7b76e49982f84" target="test_blank"&gt;https://gist.github.com/longcao/bb61f1798ccbbfa4a0d7b76e49982f84&lt;/A&gt;&lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 11 Jul 2016 16:59:35 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/writing-dataframe-to-postgresql-via-jdbc-extremely-slow-spark-1/m-p/29595#M21318</guid>
      <dc:creator>longcao</dc:creator>
      <dc:date>2016-07-11T16:59:35Z</dc:date>
    </item>
    <item>
      <title>Re: Writing DataFrame to PostgreSQL via JDBC extremely slow (Spark 1.6.1)</title>
      <link>https://community.databricks.com/t5/data-engineering/writing-dataframe-to-postgresql-via-jdbc-extremely-slow-spark-1/m-p/29596#M21319</link>
      <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt;Hi&lt;/P&gt;
&lt;P&gt;Tried your solution, but I do get: 'syntax error at or near WITH,' also I had to write like that: &lt;/P&gt;
&lt;P&gt;val bytes: Iterator[Byte] = rows.map { row =&amp;gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt; (row.mkString(delimiter) + "\n").getBytes.toIterator&lt;P&gt;&lt;/P&gt; }.flatten
&lt;P&gt;Could u please share how to make it work?&lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 28 Jul 2016 13:14:05 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/writing-dataframe-to-postgresql-via-jdbc-extremely-slow-spark-1/m-p/29596#M21319</guid>
      <dc:creator>ALincoln</dc:creator>
      <dc:date>2016-07-28T13:14:05Z</dc:date>
    </item>
    <item>
      <title>Re: Writing DataFrame to PostgreSQL via JDBC extremely slow (Spark 1.6.1)</title>
      <link>https://community.databricks.com/t5/data-engineering/writing-dataframe-to-postgresql-via-jdbc-extremely-slow-spark-1/m-p/29597#M21320</link>
      <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt;Hmm, not entirely sure why the snippet might not work for your setup. Is it possible that you are running on a PostgreSQL version that's not compatible with the COPY command in the snippet? I was running PostgreSQL 9.5.x on RDS.&lt;/P&gt;
&lt;P&gt;As for the flatten, I think there is an implicit conversion (https://github.com/scala/scala/blob/v2.10.5/src/library/scala/collection/TraversableOnce.scala#L389 that allowed me to flatten the Iterator[Array[Byte]] to an Iterator[Byte]. An IDE may complain about that or you may have that disabled somehow but it was working in a notebook on Spark 1.6.1 for me.&lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 28 Jul 2016 20:26:10 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/writing-dataframe-to-postgresql-via-jdbc-extremely-slow-spark-1/m-p/29597#M21320</guid>
      <dc:creator>longcao</dc:creator>
      <dc:date>2016-07-28T20:26:10Z</dc:date>
    </item>
    <item>
      <title>Re: Writing DataFrame to PostgreSQL via JDBC extremely slow (Spark 1.6.1)</title>
      <link>https://community.databricks.com/t5/data-engineering/writing-dataframe-to-postgresql-via-jdbc-extremely-slow-spark-1/m-p/29598#M21321</link>
      <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt;Thanks for response. I'm running my job with driver: &lt;/P&gt;
&lt;PRE&gt;&lt;CODE&gt;postgresql:9.4-1201-jdbc41&lt;/CODE&gt;&lt;/PRE&gt;
&lt;P&gt;I have an AWS Redshift service with PostgreSQL behind. We'll be migrating from Redshift soon, that's why I'm using direct JDBC connection to write to Postgres. Maybe Redshift service is causing a problem somehow. Anyway thanks, I'll keep investigating.&lt;/P&gt;
&lt;P&gt;&lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 29 Jul 2016 08:17:13 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/writing-dataframe-to-postgresql-via-jdbc-extremely-slow-spark-1/m-p/29598#M21321</guid>
      <dc:creator>ALincoln</dc:creator>
      <dc:date>2016-07-29T08:17:13Z</dc:date>
    </item>
  </channel>
</rss>

