- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-05-2025 04:12 AM
Hi there! You seem have a good grasp on the batch vs. streaming schema evolution topic.
Do you know if using `foreachBatch` invokes the batch CDF read semantics that you've described above?
I have a streaming read
spark.readStream.format("delta")
.option("schemaTrackingLocation", checkpoint_path)
.option("readChangeFeed", "true")
.table(sourcepath)
.writeStream
.foreachBatch(batch_func)
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
).start().awaitTermination()That has worked well until I made a schema change. Now it indeed seems to read the change data feed using the latest schema and throws an error where the new column is not found.
Per the info box [here](https://learn.microsoft.com/en-gb/azure/databricks/delta/delta-change-data-feed#change-data-feed-lim...) and my current setup (Databricks runtime 15.4 LTS) I should be able to handle column mapping in a stream read.
The source table delta history is 0-indexed and is as follows:
Version 0: CREATE TABLE AS SELECT
...
Version 11: WRITE [Last version consumed by streaming read]
Version 12: ADD COLUMNS
Version 13: SET TBLPROPERTIES "properties","{"delta.columnMapping.mode":"name"}
Version 14: DROP COLUMNS
Version 15: ADD COLUMNS
Version 15: CHANGE COLUMNS
What is surprising is that when I put ` .option("endingVersion", 11) ` in the streaming read the same error occurs, even though this is still on the old source schema.