cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Apply change data with delete and schema evolution

noimeta
Contributor II

Hi,

Currently, I'm using structure streaming to insert/update/delete to a table. A row will be deleted if value in 'Operation' column is 'deleted'. Everything seems to work fine until there's a new column.

Since I don't need 'Operation' column in the target table, I use whenMatchedUpdate(set=

..) and whenNotMatchedInsert(values=..) instead of whenMatchedUpdateAll() and whenNotMatchedInsertAll(). However, from the document, it seems the schema evolution occurs only when there is either an updateAll or an insertAll or both. The 'Operation' column also can't be dropped since it's needed in merge (delete) condition.

Is there any way to automatically add a new column and also drop some columns before merging?

4 REPLIES 4

Hubert-Dudek
Esteemed Contributor III

To help it in that case, I think I would need to see more data + sample data.

You can also implement live delta tables - there are new function apply_changes which can be excellent in your case https://docs.databricks.com/data-engineering/delta-live-tables/delta-live-tables-cdc.html

Thank you for your answer. I haven't tried delta live table yet, but it's on the future plan.

Anyway, the sample data looks something like:

bronze table

Screen Shot 2565-08-01 at 10.02.13silver table

Screen Shot 2565-08-01 at 10.02.37 

Then, the schema of the bronze table automatically got updated with a new column

Screen Shot 2565-08-01 at 10.03.30This is the result I want for the silver table

Screen Shot 2565-08-01 at 10.03.51 

Currently, I have to manually update the schema of the silver table.

If I use whenMatchedUpdateAll() and whenNotMatchedInsertAll(), the Op column will be added to the silver table.

If I use whenMatchedUpdate() and whenNotMatchedInsert(), the column a1 won't be added to the table.

User16753725469
Contributor II

please go through this documentation https://docs.delta.io/latest/api/python/index.html

Thank you for the document. It's very helpful.

From the doc, I thought I would be able to use

deltaTable = DeltaTable.replace(sparkSession)
    .tableName("testTable")
    .addColumns(df.schema)
    .execute()

to update the schema in the code when some schema change is detected.

Anyway, this piece of code does replace the table, so not only the data got update, but all the data are also gone.

Do you have any other suggestion?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.