cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Apply change data with delete and schema evolution

noimeta
Contributor III

Hi,

Currently, I'm using structure streaming to insert/update/delete to a table. A row will be deleted if value in 'Operation' column is 'deleted'. Everything seems to work fine until there's a new column.

Since I don't need 'Operation' column in the target table, I use whenMatchedUpdate(set=

..) and whenNotMatchedInsert(values=..) instead of whenMatchedUpdateAll() and whenNotMatchedInsertAll(). However, from the document, it seems the schema evolution occurs only when there is either an updateAll or an insertAll or both. The 'Operation' column also can't be dropped since it's needed in merge (delete) condition.

Is there any way to automatically add a new column and also drop some columns before merging?

4 REPLIES 4

Hubert-Dudek
Esteemed Contributor III

To help it in that case, I think I would need to see more data + sample data.

You can also implement live delta tables - there are new function apply_changes which can be excellent in your case https://docs.databricks.com/data-engineering/delta-live-tables/delta-live-tables-cdc.html

Thank you for your answer. I haven't tried delta live table yet, but it's on the future plan.

Anyway, the sample data looks something like:

bronze table

Screen Shot 2565-08-01 at 10.02.13silver table

Screen Shot 2565-08-01 at 10.02.37 

Then, the schema of the bronze table automatically got updated with a new column

Screen Shot 2565-08-01 at 10.03.30This is the result I want for the silver table

Screen Shot 2565-08-01 at 10.03.51 

Currently, I have to manually update the schema of the silver table.

If I use whenMatchedUpdateAll() and whenNotMatchedInsertAll(), the Op column will be added to the silver table.

If I use whenMatchedUpdate() and whenNotMatchedInsert(), the column a1 won't be added to the table.

User16753725469
Contributor II

please go through this documentation https://docs.delta.io/latest/api/python/index.html

Thank you for the document. It's very helpful.

From the doc, I thought I would be able to use

deltaTable = DeltaTable.replace(sparkSession)
    .tableName("testTable")
    .addColumns(df.schema)
    .execute()

to update the schema in the code when some schema change is detected.

Anyway, this piece of code does replace the table, so not only the data got update, but all the data are also gone.

Do you have any other suggestion?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group