cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

differences between notebooks and notebooks that run inside a job

jeremy98
Contributor III

Hello Community,

I'm facing an issue with a job that runs a notebook task. When I run the same join condition through the job pipeline, it produces different results compared to running the notebook interactively (outside the job).

Why might this be happening? Could there be differences in how timestamp columns are handled between jobs and interactive notebook runs?

3 REPLIES 3

holly
Databricks Employee
Databricks Employee

This is very interesting (and unexpected) - can you share more about what the job is doing, your configs, and what differences you're seeing in the results?

jeremy98
Contributor III

 

Hi,

Thanks for your question!
What I'm doing is essentially loading a table from PostgreSQL using a Spark JDBC connection, and also reading the corresponding table from Databricks. I then perform delete, update, and insert operations by comparing the two datasets.

Using join conditions, I check for differences between the tables. If there are any discrepancies, a query is applied to sync the data in PostgreSQL accordingly—like this:

    update_cols = info_logic['update_cols'].split(', ')
    change_conditions = [ ( (source_df[col] != postgres_df[col]) | 
        (source_df[col].isNull() & postgres_df[col].isNotNull()) | 
        (source_df[col].isNotNull() & postgres_df[col].isNull()))  for col in update_cols] # do we need to update also if the NULLs are in the right part? Once was inserted the first time in databricks?
    
    final_change_condition = change_conditions[0] # Combine all conditions with "OR" logic to cover any column that changed or is NULL
    for cond in change_conditions[1:]:
        final_change_condition = final_change_condition | cond

    changed_records_df = source_df \
        .join(postgres_df, join_condition, "left_outer") \
        .filter(final_change_condition) \
        .select(source_df["*"]) # since this is my source, my target table in databricks takes these values
    
    num_rows = changed_records_df.count()

    print(f"UPDATE {num_rows} records")
    if num_rows > 0:
        update_cols = [col.strip() for col in info_logic['update_cols'].split(",")]
        primary_keys = [col.strip() for col in info_logic['primary_keys'].split(",")]

        update_data = [tuple(row[col] for col in update_cols + primary_keys) for row in changed_records_df.toLocalIterator()]
        update_query = syncer._generate_update_statement(table_name, info_logic['update_cols'], info_logic['primary_keys'])

        syncer._execute_dml(update_query, update_data, connection_properties, "UPDATE", batch_size=BATCH_SIZE)

------------

    records_to_delete_df = postgres_df.join(
       source_df,
       join_condition,
       "left_anti"
    ).select(*[postgres_df[col.strip()] for col in info_logic['primary_keys'].split(",")])

    num_rows = records_to_delete_df.count()
    print(f"DELETE {num_rows} records")
    if num_rows > 0:
        delete_data = [tuple(row) for row in records_to_delete_df.toLocalIterator()]
        delete_query = syncer._generate_delete_statement(table_name, info_logic['primary_keys'])
        syncer._execute_dml(delete_query, delete_data, connection_properties, "DELETE", batch_size=BATCH_SIZE)

--------

    new_records_df = source_df.join(
        postgres_df,
        join_condition,
        "left_anti"
    ).select(source_df["*"])

    num_rows = new_records_df.count()
    print(f"INSERT {num_rows} records")
    if num_rows > 0:
        all_columns = [col.strip() for col in info_logic['primary_keys'].split(",")]
        if info_logic['update_cols']:
            all_columns.extend([col.strip() for col in info_logic['update_cols'].split(",")])

        insert_data = [tuple(row[col] for col in all_columns) for row in new_records_df.toLocalIterator()]
        insert_query = syncer._generate_insert_statement(table_name, all_columns)

        syncer._execute_dml(insert_query, insert_data, connection_properties, "INSERT", batch_size=BATCH_SIZE)

but, this code works properly using notebook interactive but not with job...

but sorry, this problem was solved restarting the cluster... that was up starting 4 days ago... what does it mean?

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now