Apply change data with delete and schema evolution
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-28-2022 04:56 AM
Hi,
Currently, I'm using structure streaming to insert/update/delete to a table. A row will be deleted if value in 'Operation' column is 'deleted'. Everything seems to work fine until there's a new column.
Since I don't need 'Operation' column in the target table, I use whenMatchedUpdate(set=
..) and whenNotMatchedInsert(values=..) instead of whenMatchedUpdateAll() and whenNotMatchedInsertAll(). However, from the document, it seems the schema evolution occurs only when there is either an updateAll or an insertAll or both. The 'Operation' column also can't be dropped since it's needed in merge (delete) condition.
Is there any way to automatically add a new column and also drop some columns before merging?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-29-2022 04:16 AM
To help it in that case, I think I would need to see more data + sample data.
You can also implement live delta tables - there are new function apply_changes which can be excellent in your case https://docs.databricks.com/data-engineering/delta-live-tables/delta-live-tables-cdc.html
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-31-2022 08:10 PM
Thank you for your answer. I haven't tried delta live table yet, but it's on the future plan.
Anyway, the sample data looks something like:
bronze table
silver table
Then, the schema of the bronze table automatically got updated with a new column
This is the result I want for the silver table
Currently, I have to manually update the schema of the silver table.
If I use whenMatchedUpdateAll() and whenNotMatchedInsertAll(), the Op column will be added to the silver table.
If I use whenMatchedUpdate() and whenNotMatchedInsert(), the column a1 won't be added to the table.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-01-2022 12:33 AM
please go through this documentation https://docs.delta.io/latest/api/python/index.html
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-01-2022 01:41 AM
Thank you for the document. It's very helpful.
From the doc, I thought I would be able to use
deltaTable = DeltaTable.replace(sparkSession)
.tableName("testTable")
.addColumns(df.schema)
.execute()
to update the schema in the code when some schema change is detected.
Anyway, this piece of code does replace the table, so not only the data got update, but all the data are also gone.
Do you have any other suggestion?

