<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Getting error while loading parquet data into Postgres (using spark-postgres library) ClassNotFoundException: Failed to find data source: postgres. Please find packages at http://spark.apache.org/third-party-projects.html Caused by: ClassNotFoundException in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/getting-error-while-loading-parquet-data-into-postgres-using/m-p/12657#M7429</link>
    <description>&lt;P&gt;Just a lucky guess here... Maybe the spark-postgres library was not correctly installed? Try removing the jar and uploading it once again.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;If that doesn't work, try to replace &lt;B&gt;spark.read.format("postgres")&lt;/B&gt;&lt;/P&gt;&lt;P&gt;with &lt;B&gt;spark.read.format("io.frama.parisni.spark.postgres.DataSource")&lt;/B&gt;&lt;/P&gt;</description>
    <pubDate>Wed, 11 Jan 2023 12:47:52 GMT</pubDate>
    <dc:creator>daniel_sahal</dc:creator>
    <dc:date>2023-01-11T12:47:52Z</dc:date>
    <item>
      <title>Getting error while loading parquet data into Postgres (using spark-postgres library) ClassNotFoundException: Failed to find data source: postgres. Please find packages at http://spark.apache.org/third-party-projects.html Caused by: ClassNotFoundException</title>
      <link>https://community.databricks.com/t5/data-engineering/getting-error-while-loading-parquet-data-into-postgres-using/m-p/12656#M7428</link>
      <description>&lt;P&gt;Hi Fellas - I'm trying to load parquet data (in GCS location) into Postgres DB (google cloud) . For bulk upload data into PG we are using&amp;nbsp;(spark-postgres library)&lt;/P&gt;&lt;P&gt;&lt;A href="https://framagit.org/interhop/library/spark-etl/-/tree/master/spark-postgres/src/main/scala/io/frama/parisni/spark/postgres" alt="https://framagit.org/interhop/library/spark-etl/-/tree/master/spark-postgres/src/main/scala/io/frama/parisni/spark/postgres" target="_blank"&gt;https://framagit.org/interhop/library/spark-etl/-/tree/master/spark-postgres/src/main/scala/io/frama/parisni/spark/postgres&lt;/A&gt;.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Pre requisite- To use above library , I have uploaded jar in my cluster &lt;/P&gt;&lt;P&gt;1- &lt;A href="https://mvnrepository.com/artifact/io.frama.parisni/spark-postgres/0.0.1" alt="https://mvnrepository.com/artifact/io.frama.parisni/spark-postgres/0.0.1" target="_blank"&gt;https://mvnrepository.com/artifact/io.frama.parisni/spark-postgres/0.0.1&lt;/A&gt;&lt;/P&gt;&lt;P&gt;2- &lt;A href="https://mvnrepository.com/artifact/org.postgresql/postgresql" alt="https://mvnrepository.com/artifact/org.postgresql/postgresql" target="_blank"&gt;https://mvnrepository.com/artifact/org.postgresql/postgresql&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;%scala&lt;/P&gt;&lt;P&gt;val data = spark.read.format("postgres")&lt;/P&gt;&lt;P&gt;.option("url","jdbc:postgres://IP:PORT/DATABASE?user=USERNAME&amp;amp;currentSchema=SCHEMANAME")&lt;/P&gt;&lt;P&gt;.option("password","PASSWORD")&lt;/P&gt;&lt;P&gt;.option("query","select * from TABLENAME")&lt;/P&gt;&lt;P&gt;.option("partitions",4)&lt;/P&gt;&lt;P&gt;.option("numSplits",5)&lt;/P&gt;&lt;P&gt;.option("multiline",true)&lt;/P&gt;&lt;P&gt;.load&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;But it is giving error -&lt;/P&gt;&lt;P&gt;ClassNotFoundException: Failed to find data source: postgres. Please find packages at&amp;nbsp;&lt;A href="http://spark.apache.org/third-party-projects.html" alt="http://spark.apache.org/third-party-projects.html" target="_blank"&gt;http://spark.apache.org/third-party-projects.html&lt;/A&gt;&amp;nbsp;Caused by: ClassNotFoundException: postgres.DefaultSource&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;P.S- Since my data is huge I need to use bulk load operation since by jdbc connection and inserting via cursor batch is not optimal method . &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Let me know if there are any other methods available for bulk insert . &lt;/P&gt;&lt;P&gt;Data size ~ 40 GB parquet &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Regards,&lt;/P&gt;</description>
      <pubDate>Wed, 11 Jan 2023 12:10:40 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/getting-error-while-loading-parquet-data-into-postgres-using/m-p/12656#M7428</guid>
      <dc:creator>explorer</dc:creator>
      <dc:date>2023-01-11T12:10:40Z</dc:date>
    </item>
    <item>
      <title>Re: Getting error while loading parquet data into Postgres (using spark-postgres library) ClassNotFoundException: Failed to find data source: postgres. Please find packages at http://spark.apache.org/third-party-projects.html Caused by: ClassNotFoundException</title>
      <link>https://community.databricks.com/t5/data-engineering/getting-error-while-loading-parquet-data-into-postgres-using/m-p/12657#M7429</link>
      <description>&lt;P&gt;Just a lucky guess here... Maybe the spark-postgres library was not correctly installed? Try removing the jar and uploading it once again.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;If that doesn't work, try to replace &lt;B&gt;spark.read.format("postgres")&lt;/B&gt;&lt;/P&gt;&lt;P&gt;with &lt;B&gt;spark.read.format("io.frama.parisni.spark.postgres.DataSource")&lt;/B&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 11 Jan 2023 12:47:52 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/getting-error-while-loading-parquet-data-into-postgres-using/m-p/12657#M7429</guid>
      <dc:creator>daniel_sahal</dc:creator>
      <dc:date>2023-01-11T12:47:52Z</dc:date>
    </item>
    <item>
      <title>Re: Getting error while loading parquet data into Postgres (using spark-postgres library) ClassNotFoundException: Failed to find data source: postgres. Please find packages at http://spark.apache.org/third-party-projects.html Caused by: ClassNotFoundException</title>
      <link>https://community.databricks.com/t5/data-engineering/getting-error-while-loading-parquet-data-into-postgres-using/m-p/12659#M7431</link>
      <description>&lt;P&gt;Hi @Kaniz Fatma​&amp;nbsp;and @Daniel Sahal​&amp;nbsp;, Thanks for reaching out , appreciate it. &lt;/P&gt;&lt;P&gt;Background to help you understand the context .&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Problem statement-&lt;/P&gt;&lt;P&gt;We are facing performance issues during Postgres data loading from GCS storage (parquet format) . Data load is done in 2 parts. &lt;/P&gt;&lt;P&gt;First to create postgres intermediate_table by reading GCS data as spark dataframe and save it in Postgres as table .&lt;/P&gt;&lt;P&gt;Then from intermediate load into main table (with some logic ) .We have already implemented the code and it is working fine for smaller data sets &lt;/P&gt;&lt;P&gt;But when it comes to large dataset , it is posing issue . &lt;/P&gt;&lt;P&gt;We ensured there is no constraint in Postgres intermediate table also there is one PK in postgres main table for merging functionality. Postgres is also in GCP.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Elaborating what has been said above with code snippet.&lt;/P&gt;&lt;P&gt;Stage-1 -&amp;gt; &lt;/P&gt;&lt;P&gt;Creation of Postgres intermediate table (persistent table) by writing spark data frame as table in Postgres (overwrite mode) from the data stored in GCS (parquet data format ~ volume 25-30 GB, total number of part files - 71)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Code snippet – &lt;/P&gt;&lt;P&gt;df.write.mode("overwrite").option("numPartitions", number ).format("jdbc").option("url", url).option("dbtable", dbtable).option("user", user_nm).option("password",passwrd).save()&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Stage -2 -&amp;gt; Inserting Postgres intermediate table (created in step-1) data into main Postgres table (persistent) via insert statement (SCD type-1).&lt;/P&gt;&lt;P&gt;&amp;nbsp;Code&lt;/P&gt;&lt;P&gt;snippet – &lt;/P&gt;&lt;P&gt;INSERT INTO main_tab (SELECT columns&amp;nbsp;FROM intermediate_table ) ON CONFLICT do something&amp;nbsp;DO UPDATE SET &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Postgres and Databricks resource details &lt;/P&gt;&lt;P&gt;Attached screen shots &lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;span class="lia-inline-image-display-wrapper" image-alt="databricks_resource"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/864i345B1740A946D16A/image-size/large?v=v2&amp;amp;px=999" role="button" title="databricks_resource" alt="databricks_resource" /&gt;&lt;/span&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper" image-alt="postgres_resource"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/867i8F6C5AD842EA80A0/image-size/large?v=v2&amp;amp;px=999" role="button" title="postgres_resource" alt="postgres_resource" /&gt;&lt;/span&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Tried options ,&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;We ensured to use maxfilepertrigger ,batchsize , num of partition options as well in above code to ingest bulk data.&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Then we saw spark-postgres option and tried below ways &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;option -1 – writing dataframe and giving option as “postgres” ex- write.format("postgres")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;failed with - ClassNotFoundException: Failed to find data source: postgres. Please find packages&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Option 2- We were trying low level pyspark API as suggested in spark-postgres-library ( ex- sc._jvm.io.frama.parisni.PGUtil) but&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;PGutil is not available in code as well&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;We tried with postgressql.driver as well but it didn’t work out &lt;/P&gt;&lt;P&gt;&lt;A href="https://docs.databricks.com/external-data/postgresql.html" alt="https://docs.databricks.com/external-data/postgresql.html" target="_blank"&gt;https://docs.databricks.com/external-data/postgresql.html&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;Regards,&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 16 Jan 2023 16:53:33 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/getting-error-while-loading-parquet-data-into-postgres-using/m-p/12659#M7431</guid>
      <dc:creator>explorer</dc:creator>
      <dc:date>2023-01-16T16:53:33Z</dc:date>
    </item>
    <item>
      <title>Re: Getting error while loading parquet data into Postgres (using spark-postgres library) ClassNotFoundException: Failed to find data source: postgres. Please find packages at http://spark.apache.org/third-party-projects.html Caused by: ClassNotFoundException</title>
      <link>https://community.databricks.com/t5/data-engineering/getting-error-while-loading-parquet-data-into-postgres-using/m-p/12660#M7432</link>
      <description>&lt;P&gt;@Deepak U​&amp;nbsp;&lt;/P&gt;&lt;P&gt;It's hard to find an answer here without the proper monitoring of the entire process.&lt;/P&gt;&lt;P&gt;First of all, I would check how the workers behave during runtime (Ganglia - what's the overall usage of the workers, where's the bottleneck. It could be even more efficient to have a smaller number of more powerful nodes than multiple less powerful ones.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Second thing - networking. Maybe there's a bottleneck somewhere in there? Worth checking.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Third thing - Postgres instance - monitoring during the runtime.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Fourth thing - worth considering - exporting the data to Parquet files into storage, then using COPY FROM on Postgres to import the data. It's usually faster than using JDBC.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 17 Jan 2023 08:28:59 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/getting-error-while-loading-parquet-data-into-postgres-using/m-p/12660#M7432</guid>
      <dc:creator>daniel_sahal</dc:creator>
      <dc:date>2023-01-17T08:28:59Z</dc:date>
    </item>
    <item>
      <title>Re: Getting error while loading parquet data into Postgres (using spark-postgres library) ClassNotFoundException: Failed to find data source: postgres. Please find packages at http://spark.apache.org/third-party-projects.html Caused by: ClassNotFoundException</title>
      <link>https://community.databricks.com/t5/data-engineering/getting-error-while-loading-parquet-data-into-postgres-using/m-p/12662#M7434</link>
      <description>&lt;P&gt;Hi @Kaniz Fatma​&amp;nbsp;, @Daniel Sahal​&amp;nbsp;- &lt;/P&gt;&lt;P&gt;Few updates from my side.&lt;/P&gt;&lt;P&gt;After so many hits and trials , &lt;B&gt;psycopg2&lt;/B&gt; worked out in my case.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;We can process 200+GB data with 10 node cluster (n2-highmem-4,32 GB Memory, 4 Cores) and driver 32 GB Memory, 4 Cores with Runtime10.4.x-scala2.12&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;It took close to 100 min to load whole data into PG . &lt;/P&gt;&lt;P&gt;&lt;I&gt;&lt;U&gt;Though we need to change parquet to csv and read csv sequentially to load into PG &lt;/U&gt;&lt;/I&gt;&lt;/P&gt;&lt;P&gt;like the below code snippet -&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;import psycopg2&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;# Create a connection to the PostgreSQL database&lt;/P&gt;&lt;P&gt;con = psycopg2.connect(database="dbname",user="user-nm",password="password",host="ip",port="5432")&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;# Create a cursor object&lt;/P&gt;&lt;P&gt;cur = con.cursor()&lt;/P&gt;&lt;P&gt;# Open the CSV file&lt;/P&gt;&lt;P&gt;files_to_load = &lt;A href="https://dbutils.fs.ls" alt="https://dbutils.fs.ls" target="_blank"&gt;dbutils.fs.ls&lt;/A&gt;("dbfs:/dir/")&lt;/P&gt;&lt;P&gt;while files_to_load:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;file_path = files_to_load.pop(0).path&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;if file_path.endswith('.csv'):&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;file_path = file_path.replace("dbfs:","/dbfs")&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;print(file_path)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;with open(file_path, "r") as f:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;# Use the copy_from function to load the data into the PostgreSQL table&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;cur.copy_from(f, "pg_table_name", sep=",")&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;# Commit the changes&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;con.commit()&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;# Close the cursor and connection&lt;/P&gt;&lt;P&gt;con.close()&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt; Few questions if you can assist&lt;/P&gt;&lt;P&gt;1- Is there any way we can tweak the code to read parquet data instead .csv data in above code (we have data originally in parquet format (databricks delta table) and if we can use same native format , we can cut down extra processing )&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;2- Is there any way we can run parallelly instead sequentially .&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;3- Any other tips which can boost the performance and take less time . &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Regards,&lt;/P&gt;</description>
      <pubDate>Wed, 18 Jan 2023 15:44:11 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/getting-error-while-loading-parquet-data-into-postgres-using/m-p/12662#M7434</guid>
      <dc:creator>explorer</dc:creator>
      <dc:date>2023-01-18T15:44:11Z</dc:date>
    </item>
  </channel>
</rss>

