differences between notebooks and notebooks that run inside a job
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
3 weeks ago
Hello Community,
I'm facing an issue with a job that runs a notebook task. When I run the same join condition through the job pipeline, it produces different results compared to running the notebook interactively (outside the job).
Why might this be happening? Could there be differences in how timestamp columns are handled between jobs and interactive notebook runs?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
3 weeks ago
This is very interesting (and unexpected) - can you share more about what the job is doing, your configs, and what differences you're seeing in the results?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
2 weeks ago
Hi,
Thanks for your question!
What I'm doing is essentially loading a table from PostgreSQL using a Spark JDBC connection, and also reading the corresponding table from Databricks. I then perform delete, update, and insert operations by comparing the two datasets.
Using join conditions, I check for differences between the tables. If there are any discrepancies, a query is applied to sync the data in PostgreSQL accordingly—like this:
update_cols = info_logic['update_cols'].split(', ')
change_conditions = [ ( (source_df[col] != postgres_df[col]) |
(source_df[col].isNull() & postgres_df[col].isNotNull()) |
(source_df[col].isNotNull() & postgres_df[col].isNull())) for col in update_cols] # do we need to update also if the NULLs are in the right part? Once was inserted the first time in databricks?
final_change_condition = change_conditions[0] # Combine all conditions with "OR" logic to cover any column that changed or is NULL
for cond in change_conditions[1:]:
final_change_condition = final_change_condition | cond
changed_records_df = source_df \
.join(postgres_df, join_condition, "left_outer") \
.filter(final_change_condition) \
.select(source_df["*"]) # since this is my source, my target table in databricks takes these values
num_rows = changed_records_df.count()
print(f"UPDATE {num_rows} records")
if num_rows > 0:
update_cols = [col.strip() for col in info_logic['update_cols'].split(",")]
primary_keys = [col.strip() for col in info_logic['primary_keys'].split(",")]
update_data = [tuple(row[col] for col in update_cols + primary_keys) for row in changed_records_df.toLocalIterator()]
update_query = syncer._generate_update_statement(table_name, info_logic['update_cols'], info_logic['primary_keys'])
syncer._execute_dml(update_query, update_data, connection_properties, "UPDATE", batch_size=BATCH_SIZE)
------------
records_to_delete_df = postgres_df.join(
source_df,
join_condition,
"left_anti"
).select(*[postgres_df[col.strip()] for col in info_logic['primary_keys'].split(",")])
num_rows = records_to_delete_df.count()
print(f"DELETE {num_rows} records")
if num_rows > 0:
delete_data = [tuple(row) for row in records_to_delete_df.toLocalIterator()]
delete_query = syncer._generate_delete_statement(table_name, info_logic['primary_keys'])
syncer._execute_dml(delete_query, delete_data, connection_properties, "DELETE", batch_size=BATCH_SIZE)
--------
new_records_df = source_df.join(
postgres_df,
join_condition,
"left_anti"
).select(source_df["*"])
num_rows = new_records_df.count()
print(f"INSERT {num_rows} records")
if num_rows > 0:
all_columns = [col.strip() for col in info_logic['primary_keys'].split(",")]
if info_logic['update_cols']:
all_columns.extend([col.strip() for col in info_logic['update_cols'].split(",")])
insert_data = [tuple(row[col] for col in all_columns) for row in new_records_df.toLocalIterator()]
insert_query = syncer._generate_insert_statement(table_name, all_columns)
syncer._execute_dml(insert_query, insert_data, connection_properties, "INSERT", batch_size=BATCH_SIZE)
but, this code works properly using notebook interactive but not with job...
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
2 weeks ago
but sorry, this problem was solved restarting the cluster... that was up starting 4 days ago... what does it mean?

