cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to use change data feed when schema is changing between delta table versions?

LasseL
New Contributor III

How to use change data feed when delta table schema changes between delta table versions?

I tried to read change data feed in parts (in code snippet I read version 1372, because 1371 and 1373 schema versions are different), but getting error

UnsupportedOperationException: [DELTA_CHANGE_DATA_FEED_INCOMPATIBLE_DATA_SCHEMA] Retrieving table changes between version 1372 and 1372 failed because of an incompatible data schema.

Your read schema is {"type":"struct","fields":[{"name":..."metadata":{}}]} at version 1375, but we found an incompatible data schema at version 1372.

Code snippet:

df_test = (
    spark.read.format('delta')
    .option("readChangeFeed", "true")
    .option("startingVersion", 1372)
    .option("endingVersion", 1372)
    .load(path)
)
 
df_test.display()
 
So does that mean that change data feed is using the last (or the first) feeding version schema? Can this be changed? Streaming is not option (in streaming you can add schema change tracking).
Is there any trick to get this working?
4 REPLIES 4

raphaelblg
Databricks Employee
Databricks Employee

Hi @LasseL, Please check: What is the schema for the change data feed? . It might help you

 

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

ritutiw10
New Contributor II

@raphaelblg , @LasseL  were you able to seld this issue in CDF schema evaluation by defining start and end version

LRALVA
Honored Contributor

@LasseL 

When you read from the change data feed in batch mode, Delta Lake always uses a single schema:
By default, it uses the latest table version’s schema, even if you’re only reading an older version
On Delta Runtime ≥ 12.2 LTS with column mapping enabled, batch CDF reads instead use the end version’s schema, but still fail if your version range spans a non-additive schema change (e.g. drop/rename/type change)
Streaming reads (spark.readStream.option("readChangeFeed","true")) support schema evolution automatically, but batch reads do not.

This means CDF tried to apply the current/latest schema (v1375) when reading v1372, and detected that columns/types didn’t match

There is no built-in option in the Python batch API to switch to the start or end schema;
CDF’s batch path is fixed to use the latest (or, with mapping, the end) schema.

 

LR

Seb_G
New Contributor II

Hi there! You seem have a good grasp on the batch vs. streaming schema evolution topic.

Do you know if using `foreachBatch` invokes the batch CDF read semantics that you've described above?
I have a streaming read

spark.readStream.format("delta")
.option("schemaTrackingLocation", checkpoint_path)
.option("readChangeFeed", "true")
.table(sourcepath)
.writeStream
.foreachBatch(batch_func)
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
).start().awaitTermination()

That has worked well until I made a schema change. Now it indeed seems to read the change data feed using the latest schema and throws an error where the new column is not found.

Per the info box [here](https://learn.microsoft.com/en-gb/azure/databricks/delta/delta-change-data-feed#change-data-feed-lim...) and my current setup (Databricks runtime 15.4 LTS) I should be able to handle column mapping in a stream read.

The source table delta history is 0-indexed and is as follows:

Version 0: CREATE TABLE AS SELECT
...
Version 11: WRITE [Last version consumed by streaming read]
Version 12: ADD COLUMNS
Version 13:
SET TBLPROPERTIES "properties","{"delta.columnMapping.mode":"name"}
Version 14: DROP COLUMNS
Version 15: ADD COLUMNS
Version 15: CHANGE COLUMNS

What is surprising is that when I put ` .option("endingVersion", 11) ` in the streaming read the same error occurs, even though this is still on the old source schema.