cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to use multi-threading and batch inserts for large UPSERT to PostgreSQL from Databricks?

Saikumar_Manne
New Contributor II

Hi everyone,

We have a Databricks (Unity Catalog) pipeline where we process large datasets in Spark and need to load incremental data into a PostgreSQL target table.

Our scenario is:

Initial full load (~300 million rows) to PostgreSQL using bulk COPY is relatively fast (few hours)

Daily incremental load volume is ~190 million (19 crore) rows

Target table has a primary key and requires merge/upsert logic

Requirement is to use PostgreSQL INSERT ... ON CONFLICT DO UPDATE for incremental loads (not just inserts)

The challenge is that performing UPSERT/merge from Databricks to PostgreSQL at this scale is extremely slow due to index lookups, transaction size, and database I/O.

We are exploring a design where Spark DataFrame (from Databricks) is written to PostgreSQL using parallelism (e.g., foreachPartition) and then merged into the final table.

Our main questions:

What is the recommended approach to perform high-volume UPSERT (100M+ rows) from Databricks to PostgreSQL?

Can multi-threading / parallel writes (using Spark partitions) significantly improve performance for INSERT ON CONFLICT DO UPDATE?

What is the best batching strategy for large merges into PostgreSQL (optimal batch size, transaction size, etc.)?

Is it better to:

Directly upsert from Spark DataFrame, or

Load into a staging table via COPY and run a server-side merge in PostgreSQL?

Are there any best practices for handling large indexed target tables during bulk merge (e.g., connection pooling, partitioned loads, or tuning settings)?

For datasets in the range of ~100–200 million incremental rows, is PostgreSQL UPSERT considered a scalable pattern, or are alternative architectures typically recommended?

Any guidance, real-world experiences, or performance recommendations for large-scale Databricks → PostgreSQL merge/upsert workloads would be very helpful.

1 ACCEPTED SOLUTION

Accepted Solutions

Hi @Saikumar_Manne ,

Yes, you are right by saying that numPartitions would not help if the bottleneck is on the Postgres side. 

AUTO CDC is very scalable solution which could easily accommodate 100M+ increments. I do think it would be faster than the current approach. Some best practices to speed up AUTO CDC:

  • Set up liquid clustering on the primary keys in the target table of the AUTO CDC - this will speed up the merge operation
  • Process data incrementally with auto loader or reading the table as stream
  • Use serverless for better scalability

Best regards,

View solution in original post

4 REPLIES 4

aleksandra_ch
Databricks Employee
Databricks Employee

Hi @Saikumar_Manne ,

As you stated, looks like the main bottleneck is on the Postgres side.

You can control the write parallelism with the numPartitions option. Please follow this documentation . forEachPartition is not adapted for your scenario.

On the other hand, if most of the rows in the Postgres table get updated during daily incremental refresh, it makes sense to replace the whole table daily (with a COPY), instead of doing INSERT. You can easily perform merge logic with AUTO CDC in Databricks and just copy the final result to the Postgres table. Also, create the index on the primary key after the bulk insert, it will be faster.

Also, did you check Lakebasea fully managed, cloud-native PostgreSQL database on top of your lakehouse tables? You can easily expose your UC table (result of the AUTO CDC) via a Lakebase Postgres instance without copying the data at all.

Hope it helps.

Best regards,

 

Hi @aleksandra_ch ,

 

thanks for the response — this is very helpful.

Just to clarify our setup: we generate ~190M incremental rows in Databricks and bulk load them into a staging/temp table in PostgreSQL (via COPY from Spark). That part is relatively fast. The real bottleneck is the next step where we run:

INSERT INTO final_table

SELECT * FROM temp_table

ON CONFLICT (pk) DO UPDATE ...

This server-side merge on the indexed final table is what takes 15–20+ hours.

Regarding numPartitions, my understanding is it mainly improves parallelism for the write from Spark to Postgres. In our case, the staging load is already acceptable, but the slowdown happens during the Postgres ON CONFLICT merge itself. So would increasing numPartitions still help meaningfully if the main cost is the database-side upsert and index lookups?

Your suggestion about replacing the whole table daily instead of large-scale UPSERT actually aligns with what we are considering, since a large portion of rows get refreshed each cycle. A truncate + bulk COPY of the final dataset may be more efficient than massive ON CONFLICT updates.

Also, good point about creating the primary key index after bulk insert — currently the index exists before the merge, which likely increases I/O.

One quick follow-up: for 100M+ row increments, would it generally be more scalable to compute the final merged result in Databricks (CDC/Delta) and then do a single bulk overwrite (COPY) to Postgres, instead of performing large ON CONFLICT merges inside Postgres?

Hi @Saikumar_Manne ,

Yes, you are right by saying that numPartitions would not help if the bottleneck is on the Postgres side. 

AUTO CDC is very scalable solution which could easily accommodate 100M+ increments. I do think it would be faster than the current approach. Some best practices to speed up AUTO CDC:

  • Set up liquid clustering on the primary keys in the target table of the AUTO CDC - this will speed up the merge operation
  • Process data incrementally with auto loader or reading the table as stream
  • Use serverless for better scalability

Best regards,

SteveOstrowski
Databricks Employee
Databricks Employee

Hi @Saikumar_Manne,

With 190M+ daily rows going into PostgreSQL via INSERT ON CONFLICT DO UPDATE, there are several levers to pull. Here is a breakdown of the approaches and tuning options.

APPROACH 1: STAGING TABLE + MERGE (RECOMMENDED FOR THIS VOLUME)

At 190M rows, writing directly with INSERT ON CONFLICT row-by-row through JDBC is going to be slow because each row requires a conflict check against the index. The staging table pattern avoids this bottleneck:

1. Bulk-load your incremental data into a temporary staging table in PostgreSQL (no constraints, no indexes).
2. Run a single SQL MERGE/UPSERT statement inside PostgreSQL to move data from staging into the target table.

From Databricks, this looks like:

# Step 1: Write to a staging table (fast parallel bulk insert, no conflict checks)
jdbc_url = "jdbc:postgresql://<host>:5432/<database>"
connection_properties = {
  "user": "<username>",
  "password": "<password>",
  "driver": "org.postgresql.Driver"
}

(df.repartition(16)
 .write
 .format("jdbc")
 .option("url", jdbc_url)
 .option("dbtable", "staging_table")
 .option("user", "<username>")
 .option("password", "<password>")
 .option("driver", "org.postgresql.Driver")
 .option("batchsize", 10000)
 .option("truncate", "true")
 .mode("overwrite")
 .save()
)

# Step 2: Run the upsert inside PostgreSQL
import jaydebeapi

conn = jaydebeapi.connect(
  "org.postgresql.Driver",
  jdbc_url,
  ["<username>", "<password>"]
)
cursor = conn.cursor()
cursor.execute("""
  INSERT INTO target_table (id, col1, col2, col3)
  SELECT id, col1, col2, col3
  FROM staging_table
  ON CONFLICT (id)
  DO UPDATE SET
      col1 = EXCLUDED.col1,
      col2 = EXCLUDED.col2,
      col3 = EXCLUDED.col3
""")
conn.commit()
cursor.close()
conn.close()

Alternatively, you can use a JDBC connection directly with the PostgreSQL driver that is already on the cluster:

import java.sql as though you were using Python's built-in approach:

driver_manager = spark._jvm.java.sql.DriverManager
connection = driver_manager.getConnection(jdbc_url, "<username>", "<password>")
statement = connection.createStatement()
statement.executeUpdate("""
  INSERT INTO target_table (id, col1, col2, col3)
  SELECT id, col1, col2, col3
  FROM staging_table
  ON CONFLICT (id)
  DO UPDATE SET
      col1 = EXCLUDED.col1,
      col2 = EXCLUDED.col2,
      col3 = EXCLUDED.col3
""")
connection.close()

This is significantly faster because the staging table write is a simple append (no conflict resolution), and the upsert runs entirely inside PostgreSQL as a single set-based operation.

APPROACH 2: DIRECT JDBC WRITE WITH foreachPartition (PARALLEL UPSERTS)

If you want to skip the staging table and write upserts directly, you can use foreachPartition to run INSERT ON CONFLICT from each Spark partition in parallel:

def upsert_partition(rows):
  import psycopg2

  conn = psycopg2.connect(
      host="<host>",
      port=5432,
      dbname="<database>",
      user="<username>",
      password="<password>"
  )
  cursor = conn.cursor()

  batch = []
  batch_size = 5000

  insert_sql = """
      INSERT INTO target_table (id, col1, col2, col3)
      VALUES (%s, %s, %s, %s)
      ON CONFLICT (id)
      DO UPDATE SET
          col1 = EXCLUDED.col1,
          col2 = EXCLUDED.col2,
          col3 = EXCLUDED.col3
  """

  for row in rows:
      batch.append((row.id, row.col1, row.col2, row.col3))
      if len(batch) >= batch_size:
          cursor.executemany(insert_sql, batch)
          conn.commit()
          batch = []

  if batch:
      cursor.executemany(insert_sql, batch)
      conn.commit()

  cursor.close()
  conn.close()

# Repartition to control parallelism, then write
df.repartition(16).foreachPartition(upsert_partition)

Note: you will need psycopg2 installed on the cluster. You can install it via the cluster Libraries UI or with %pip install psycopg2-binary in your notebook.

For even faster batch inserts, you can use psycopg2's execute_values instead of executemany:

from psycopg2.extras import execute_values

def upsert_partition_fast(rows):
  import psycopg2
  from psycopg2.extras import execute_values

  conn = psycopg2.connect(
      host="<host>", port=5432, dbname="<database>",
      user="<username>", password="<password>"
  )
  cursor = conn.cursor()

  batch = []
  batch_size = 5000

  for row in rows:
      batch.append((row.id, row.col1, row.col2, row.col3))
      if len(batch) >= batch_size:
          execute_values(
              cursor,
              """INSERT INTO target_table (id, col1, col2, col3)
                 VALUES %s
                 ON CONFLICT (id)
                 DO UPDATE SET
                     col1 = EXCLUDED.col1,
                     col2 = EXCLUDED.col2,
                     col3 = EXCLUDED.col3""",
              batch,
              page_size=batch_size
          )
          conn.commit()
          batch = []

  if batch:
      execute_values(cursor,
          """INSERT INTO target_table (id, col1, col2, col3)
             VALUES %s
             ON CONFLICT (id)
             DO UPDATE SET
                 col1 = EXCLUDED.col1,
                 col2 = EXCLUDED.col2,
                 col3 = EXCLUDED.col3""",
          batch, page_size=batch_size)
      conn.commit()

  cursor.close()
  conn.close()

df.repartition(16).foreachPartition(upsert_partition_fast)

execute_values sends multiple rows in a single INSERT statement, which is substantially faster than executemany (which sends one INSERT per row).

KEY TUNING PARAMETERS

1. Number of partitions (repartition): Controls how many parallel connections hit PostgreSQL simultaneously. Start with 8-16 and increase gradually. Going above 50 can overwhelm PostgreSQL, so monitor your database's connection pool and CPU during testing.

2. Batch size: For the foreachPartition approach, 5,000-10,000 rows per batch is a good starting point. For Spark's built-in JDBC writer, set .option("batchsize", 10000).

3. PostgreSQL server-side tuning for large upserts:

 - Increase work_mem temporarily for the session running the upsert
 - Ensure the conflict target column(s) have a proper unique index (not just a primary key constraint, though a PK creates one automatically)
 - Consider disabling autovacuum on the target table during the load and running VACUUM ANALYZE afterward
 - If using the staging table approach, set maintenance_work_mem higher for the upsert operation

4. Connection pooling: If you scale beyond 16 partitions, consider putting PgBouncer or a similar connection pooler in front of PostgreSQL to avoid overwhelming it with concurrent connections.

WHICH APPROACH TO CHOOSE

For 190M rows daily, the staging table approach (Approach 1) is the strongest recommendation. Reasons:
- The bulk load into the staging table is fully parallel and does not pay conflict-resolution costs per row.
- The single INSERT ON CONFLICT from staging to target runs as a set operation inside PostgreSQL, which is far more efficient than row-by-row or batch-by-batch upserts over JDBC.
- Easier to make idempotent and retry-safe.

The foreachPartition approach (Approach 2) works well for smaller incremental loads (under 10-20M rows), or if you cannot create a staging table.

For reference, the Databricks JDBC documentation covers the basic write options:
https://docs.databricks.com/en/connect/external-systems/jdbc.html

* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.

If this answer resolves your question, could you mark it as "Accept as Solution"? That helps other users quickly find the correct fix.