cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How can I insert into 2 tables within one database transaction with spark SQL / pyspark?

thomasthomas
New Contributor II

Hi all,

I have a postgres database that contains two tables: A and B.

Also, I have 2 delta tables, called C and D. My task is to push the data from A to C and B to D - and if something fails, then leave everything as is.

With python it is easy. Set up the connection, then create a cursor, and finally push all the data into the DB and commit at the end. Close cursor & connection.

With pyspark/spark sql this is not trivial. It looks like spark commits after each insert operation. This is not ideal because I dont want to leave any mess behind if sth fails.

An alternative solution is to maintain a temporary schema and create a postgres connection once all the data is pushed to the temp schema. Then I just call the function as is, and then if sth fails in the middle of the function, then everything remains clean.

Please advise.

4 REPLIES 4

daniel_sahal
Esteemed Contributor

@thomasthomas 
What I would do is using the RESTORE function to rollback in case of a failure.
It would work like this:

from pyspark.sql.functions import max as _max, col

tgt_table_name = "catalog.schema.tbl_name"

# Get current table version
ver_df = (
   spark.sql(f"DESCRIBE HISTORY {tgt_table_name}")
        .select(_max(col("version")).alias("version"))
)

tbl_ver = df.collect()[0].version

try:
   # Your code to transfer data here

except Exception:
   spark.sql(f"RESTORE TABLE {tgt_table_name} TO VERSION AS OF {tbl_ver}")
   raise Exception(f"Load of {tgt_table_name} failed. Restored to {tbl_ver}")

 

As I described above, I am trying to write the content of 2 delta tables to 2 Postgres tables with an insert statement either with Spark SQL or Pyspark.

Restore to version/describe statement are valid statement when you work with a delta table. Otherwise they dont work.

@Anonymous @daniel_sahal 

@thomasthomas 
Ah, sorry. I've misunderstood your question.

In this case it's a good way to do it the way you describe - setup sth like "staging" tables and push the data there. After all is done - merge it with the actual table.

Anonymous
Not applicable

Hi @thomasthomas 

We haven't heard from you since the last response from @daniel_sahal ​, and I was checking back to see if her suggestions helped you.

Or else, If you have any solution, please share it with the community, as it can be helpful to others. 

Also, Please don't forget to click on the "Select As Best" button whenever the information provided helps resolve your question.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group