Hi,
Thanks for your question!
What I'm doing is essentially loading a table from PostgreSQL using a Spark JDBC connection, and also reading the corresponding table from Databricks. I then perform delete, update, and insert operations by comparing the two datasets.
Using join conditions, I check for differences between the tables. If there are any discrepancies, a query is applied to sync the data in PostgreSQL accordinglyโlike this:
update_cols = info_logic['update_cols'].split(', ')
change_conditions = [ ( (source_df[col] != postgres_df[col]) |
(source_df[col].isNull() & postgres_df[col].isNotNull()) |
(source_df[col].isNotNull() & postgres_df[col].isNull())) for col in update_cols] # do we need to update also if the NULLs are in the right part? Once was inserted the first time in databricks?
final_change_condition = change_conditions[0] # Combine all conditions with "OR" logic to cover any column that changed or is NULL
for cond in change_conditions[1:]:
final_change_condition = final_change_condition | cond
changed_records_df = source_df \
.join(postgres_df, join_condition, "left_outer") \
.filter(final_change_condition) \
.select(source_df["*"]) # since this is my source, my target table in databricks takes these values
num_rows = changed_records_df.count()
print(f"UPDATE {num_rows} records")
if num_rows > 0:
update_cols = [col.strip() for col in info_logic['update_cols'].split(",")]
primary_keys = [col.strip() for col in info_logic['primary_keys'].split(",")]
update_data = [tuple(row[col] for col in update_cols + primary_keys) for row in changed_records_df.toLocalIterator()]
update_query = syncer._generate_update_statement(table_name, info_logic['update_cols'], info_logic['primary_keys'])
syncer._execute_dml(update_query, update_data, connection_properties, "UPDATE", batch_size=BATCH_SIZE)
------------
records_to_delete_df = postgres_df.join(
source_df,
join_condition,
"left_anti"
).select(*[postgres_df[col.strip()] for col in info_logic['primary_keys'].split(",")])
num_rows = records_to_delete_df.count()
print(f"DELETE {num_rows} records")
if num_rows > 0:
delete_data = [tuple(row) for row in records_to_delete_df.toLocalIterator()]
delete_query = syncer._generate_delete_statement(table_name, info_logic['primary_keys'])
syncer._execute_dml(delete_query, delete_data, connection_properties, "DELETE", batch_size=BATCH_SIZE)
--------
new_records_df = source_df.join(
postgres_df,
join_condition,
"left_anti"
).select(source_df["*"])
num_rows = new_records_df.count()
print(f"INSERT {num_rows} records")
if num_rows > 0:
all_columns = [col.strip() for col in info_logic['primary_keys'].split(",")]
if info_logic['update_cols']:
all_columns.extend([col.strip() for col in info_logic['update_cols'].split(",")])
insert_data = [tuple(row[col] for col in all_columns) for row in new_records_df.toLocalIterator()]
insert_query = syncer._generate_insert_statement(table_name, all_columns)
syncer._execute_dml(insert_query, insert_data, connection_properties, "INSERT", batch_size=BATCH_SIZE)
but, this code works properly using notebook interactive but not with job...