jeremy98
Honored Contributor

 

Hi,

Thanks for your question!
What I'm doing is essentially loading a table from PostgreSQL using a Spark JDBC connection, and also reading the corresponding table from Databricks. I then perform delete, update, and insert operations by comparing the two datasets.

Using join conditions, I check for differences between the tables. If there are any discrepancies, a query is applied to sync the data in PostgreSQL accordingly—like this:

    update_cols = info_logic['update_cols'].split(', ')
    change_conditions = [ ( (source_df[col] != postgres_df[col]) | 
        (source_df[col].isNull() & postgres_df[col].isNotNull()) | 
        (source_df[col].isNotNull() & postgres_df[col].isNull()))  for col in update_cols] # do we need to update also if the NULLs are in the right part? Once was inserted the first time in databricks?
    
    final_change_condition = change_conditions[0] # Combine all conditions with "OR" logic to cover any column that changed or is NULL
    for cond in change_conditions[1:]:
        final_change_condition = final_change_condition | cond

    changed_records_df = source_df \
        .join(postgres_df, join_condition, "left_outer") \
        .filter(final_change_condition) \
        .select(source_df["*"]) # since this is my source, my target table in databricks takes these values
    
    num_rows = changed_records_df.count()

    print(f"UPDATE {num_rows} records")
    if num_rows > 0:
        update_cols = [col.strip() for col in info_logic['update_cols'].split(",")]
        primary_keys = [col.strip() for col in info_logic['primary_keys'].split(",")]

        update_data = [tuple(row[col] for col in update_cols + primary_keys) for row in changed_records_df.toLocalIterator()]
        update_query = syncer._generate_update_statement(table_name, info_logic['update_cols'], info_logic['primary_keys'])

        syncer._execute_dml(update_query, update_data, connection_properties, "UPDATE", batch_size=BATCH_SIZE)

------------

    records_to_delete_df = postgres_df.join(
       source_df,
       join_condition,
       "left_anti"
    ).select(*[postgres_df[col.strip()] for col in info_logic['primary_keys'].split(",")])

    num_rows = records_to_delete_df.count()
    print(f"DELETE {num_rows} records")
    if num_rows > 0:
        delete_data = [tuple(row) for row in records_to_delete_df.toLocalIterator()]
        delete_query = syncer._generate_delete_statement(table_name, info_logic['primary_keys'])
        syncer._execute_dml(delete_query, delete_data, connection_properties, "DELETE", batch_size=BATCH_SIZE)

--------

    new_records_df = source_df.join(
        postgres_df,
        join_condition,
        "left_anti"
    ).select(source_df["*"])

    num_rows = new_records_df.count()
    print(f"INSERT {num_rows} records")
    if num_rows > 0:
        all_columns = [col.strip() for col in info_logic['primary_keys'].split(",")]
        if info_logic['update_cols']:
            all_columns.extend([col.strip() for col in info_logic['update_cols'].split(",")])

        insert_data = [tuple(row[col] for col in all_columns) for row in new_records_df.toLocalIterator()]
        insert_query = syncer._generate_insert_statement(table_name, all_columns)

        syncer._execute_dml(insert_query, insert_data, connection_properties, "INSERT", batch_size=BATCH_SIZE)

but, this code works properly using notebook interactive but not with job...