cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Change Data Feed Databricks

Mihai_Cog
Contributor

Hello,

I am doing some testing with this feature Change Data Feed using Databricks and Pyspark, of course the Delta format and I don't understand something:

  1. I created a table
  2. Saved some data inside
  3. Enabled Change Data Feed feature
  4. Apply a merge with a dataframe that involves inserts, deletes, updates
    spark.sql('MERGE INTO test t USING src s ON s.Id = t.Id and s.date_field = t.date_field WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * WHEN NOT MATCHED BY SOURCE THEN DELETE')
  5. Inspect data

SELECT * FROM table_changes('table', 2); version is 2

I observed a row with ID(for example 234123) appearing twice. In the _change_type column I have for first occurernce update_preimage, for the second occurrance update_postimage.

I will say that it is something normal to appear twice because maybe something changed on that ID, but if I am checking each value for both occurrences I see NO CHANGE.

It is something normal?

2 ACCEPTED SOLUTIONS

Accepted Solutions

Tharun-Kumar
Honored Contributor II
Honored Contributor II

@Mihai_Cog 

You have to split your merge statement into 2 parts. (Update and Insert/Delete).

MERGE INTO test t USING src s ON s.Id = t.Id and s.date_field = t.date_field and s.fields <> t.fields WHEN MATCHED THEN UPDATE SET * 

MERGE INTO test t USING src s ON s.Id = t.Id and s.date_field = t.date_field WHEN NOT MATCHED THEN INSERT * WHEN NOT MATCHED BY SOURCE THEN DELETE

In the first statement, you have to include all the columns where you don't want the update to happen if it's of the same value and then perform update. The second statement will help you insert and delete the records.

View solution in original post

Thank you for all your help.

If I will have any other questions, I'll be back 😜

View solution in original post

6 REPLIES 6

Kaniz
Community Manager
Community Manager

Hi @Mihai_CogYes, this is normal behaviour. When you enable the Change Data Feed feature and apply a merge operation, the system records change events for all data written into the table. This includes the row data and metadata indicating whether the specified row was inserted, deleted, or updated. In your case, the _change_type column shows update_preimage and update_postimage for the same ID. This means that the row with this ID was involved in an update operation. update_preimage represents the state of the row before the update, and update_postimage represents the state of the row after the update. Even if no actual changes were made to the values in the row during the update operation, the Change Data Feed feature still records this as an update event. Hence the row appears twice with update_preimage and update_postimage in the _change_type column.

Sources:

- [Delta Change Data Feed](https://docs.databricks.com/delta/delta-change-data-feed.html)
- [Delta Merge](https://docs.databricks.com/delta/merge.html)

Thank you @Kaniz for the answer, but this behaviour it is not what CDF is meant to be. 

I will explain myself:

1. With this behaviour I cannot identify which row is really updated.

2. I cannot find out the count of the real updates.

3. Storing all this data and not only the Updates/Inserts/Delete will end in a huge tabel..in few months.

4. Merging the changes to the base table, it will imply much more time, than merging the real changes.

 

How we can cover all of these inconveniences?

Tharun-Kumar
Honored Contributor II
Honored Contributor II

@Mihai_Cog When you find an entry having same values in both pre_image and post_image, it means that record was indeed updated. But there was no change in values. This record went through the merge statement, updated the column values. We will not know whether it indeed changes the value or it's going to update the same value. All the update statement in Merge does is, to identify records eligible for update and update whatever value it has on the source table. 

If you only want to have real updates, I would suggest using a filter in your merge statement to eliminate records with the same value in the column you are interested.

@Tharun-Kumar thank you for your answer.

To understand better, you are saying to add a filter, meaning something like this?

.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
)

To put all the columns I am interested in and like this I will get exactly the real changes and I can do my merge in the Upper level, only with the real changes?

 

Tharun-Kumar
Honored Contributor II
Honored Contributor II

@Mihai_Cog 

You have to split your merge statement into 2 parts. (Update and Insert/Delete).

MERGE INTO test t USING src s ON s.Id = t.Id and s.date_field = t.date_field and s.fields <> t.fields WHEN MATCHED THEN UPDATE SET * 

MERGE INTO test t USING src s ON s.Id = t.Id and s.date_field = t.date_field WHEN NOT MATCHED THEN INSERT * WHEN NOT MATCHED BY SOURCE THEN DELETE

In the first statement, you have to include all the columns where you don't want the update to happen if it's of the same value and then perform update. The second statement will help you insert and delete the records.

Thank you for all your help.

If I will have any other questions, I'll be back 😜

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.