2 weeks ago
Hi,
A column was deleted on the source table, when I ran LSDP it failed with error DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE_USE_LOG : Streaming read is not supported on tables with read-incompatible schema changes( e.g: rename or drop or datatype changes)
what is the recommended approach to handle column deletions or datatype changes on source table in Lakeflow SDP.
a week ago
Hi IM_01,
You can set pipelines.reset.allowed as a table property directly in your pipeline definition. The approach depends on whether you are using Python or SQL:
Python:
@dlt.table(
table_properties={"pipelines. reset.allowed": "true"}
)
def my_streaming_table():
return (
spark.readStream.format(" cloudFiles")
.option("cloudFiles.format", "json")
.load("/path/to/data")
)
SQL:
CREATE OR REFRESH STREAMING TABLE my_streaming_table
TBLPROPERTIES ("pipelines.reset.allowed" = "true")
AS SELECT * FROM STREAM(LIVE.source_table);
Setting this to "true" allows a full refresh of that specific table, which is what you need to resolve the DELTA_STREAMING_INCOMPATIBLE_ error. Once you set the property and do a full refresh to pick up the schema change, you may want to set it back to "false" afterward to prevent accidental full refreshes in production.
More detail on pipeline table properties is available in the docs: Pipeline table properties.
Sources:
2 weeks ago
Hi @IM_01,
This error means the upstream Delta source had a nonโadditive schema change (drop/rename/type change), and SDPโs streaming read canโt continue from the existing checkpoint.
As of today, only additive changes are supported transparently in SDP. Dropping or renaming a column, or changing its type in place, is treated as a breaking change for streaming reads. The recommended fix is to run a full refresh of the affected streaming tables (or the whole pipeline) after the schema change.
For column deletions, you have two options:
For data type changes:
The message about schemaTrackingLocation and allowSourceColumnTypeChange comes from the underlying Delta streaming engine. In Lakeflow SDP those options arenโt currently configurable, so you canโt resolve this particular error just by adding them to your pipeline definition. Instead, you need to treat the drop/type change as a breaking change and run a full refresh of the affected streaming tables. For more background on schemaTrackingLocation and column mapping, see the docs: https://docs.databricks.com/aws/en/delta/column-mapping.
If this answer resolves your question, could you mark it as โAccept as Solutionโ? That helps other users quickly find the correct fix.
2 weeks ago - last edited 2 weeks ago
Hi @Ashwin_DSA @SteveOstrowski
As part of deletion of column on the target table I tried changing column mapping mode to name and then it errors out saying SET_TBLPROPERTIES_NOT_ALLOWED_FOR_PIPELINE_TABLE
Could you please help me with this
2 weeks ago
Hi @IM_01,
It is working as expected. In Lakeflow SDP, streaming tables and materialised views are pipelineโmanaged tables, so you canโt change lowโlevel Delta properties (including delta.columnMapping.mode) with ALTER TABLE โฆ SET TBLPROPERTIES. Those are reserved for the engine, not for direct user DDL.
That also means you canโt enable column mapping + schemaTrackingLocation on an SDP table yourself to work around DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE_USE_LOG, even though the generic Delta docs talk about that option.
For your scenario:
Trying to toggle delta.columnMapping.mode = 'name' on the SDP target table is not supported and wonโt unblock this error.
If this answer resolves your question, could you mark it as โAccept as Solutionโ? That helps other users quickly find the correct fix.
2 weeks ago
Hi @Ashwin_DSA
Could you please elaborate on ignoring it in logic Ashwin?
And After reading the source table , even though I added the deleted column and populating it with NULL in target
df=df.withColumn("delcol",lit(None)) it fails.
2 weeks ago
Hi @IM_01 ,
Sorry! I may have clicked the wrong button and incorrectly accepted your comment as a solution! I don't know how to revert it. I'll check that separately. ๐
When I said "ignore it in logic," I meant... donโt physically drop the column from the source Delta table. Instead, leave the column in the source schema and stop using it downstream (donโt select/join on it, or treat it as always NULL). That way, the streaming read still sees a stable schema and doesnโt error.
In your case, the column was already dropped from the source, so the error is raised before your transformation runs. Adding it back with..
df = df.withColumn("delcol", lit(None))
only changes the inโmemory dataframe after the read. It does not fix the fact that the source tableโs schema changed incompatibly, so the stream still fails.
With SDP today, you essentially have two supported options:
Trying to work around this with column mapping / schemaTrackingLocation isnโt supported for SDPโmanaged tables, which is why SET_TBLPROPERTIES fails.
Trust that clarifies.
If this answer resolves your question, could you mark it as โAccept as Solutionโ? That helps other users quickly find the correct fix.
2 weeks ago
Hi @Ashwin_DSA
I am bit confused even though I tried adding the column, its throwing error
could you please provide better understanding on why its throwing an error even after adding column because I was thinking it tries to match schema while writing to the target, please let me know if I am missing something
2 weeks ago
I have gone through this kind of issue sometime back! Adding the column back doesn't undo the schema change from the streaming engine's perspective.
When you dropped the column, Delta recorded that as a specific transaction in the table's log (say, version N). When you added it back, that's a separate transaction (version N+1). The streaming checkpoint remembers exactly which version it last processed. When it tries to advance through the log, it hits version N, then drops and fails right there. It doesn't look ahead to see that you re-added the column in N+1. It just sees a non-additive schema change at N and stops.
Think of it this way: the error isn't about the current schema of the source table. It's about the history of schema changes the stream has to replay through. The stream processes the Delta log sequentially, and it can't skip over an incompatible change even if a later transaction "fixes" it.
So even though your source table's current schema now matches what the checkpoint expects, the problematic drop transaction is still sitting in the log between the checkpoint's last processed version and now.
To get past this you'll need to do a full refresh of the affected streaming table in SDP. That resets the checkpoint entirely and starts reading from the current state of the source table with its current schema. In the pipeline UI, you can select just that specific table for a full refresh rather than refreshing everything.
Going forward, if you need to temporarily remove a column from the source, a safer approach is to create a view on top of the source table that excludes the column, and have your SDP pipeline read from that view instead. That way the underlying Delta table's schema stays stable from the stream's perspective.
Hope this helps! If it did, please mark it as โAccept as Solutionโ so others can easily find it.
2 weeks ago
Hi @Kirankumarbs
Please let me know if my understanding is correct ๐
So when it tries to read source table it matches the src schema in checkpoint with the current source schema and add on changes as new changes which are still not part of checkpoint schema
Suppose If I have added a new column in initial stage then it would do 2 level of comparison
1. source schema
2. new cols
So it does comparison step by step by replaying the previous steps in the version N.
2 weeks ago
Pretty close, but let me clarify a bit as it is a little more involved and pretty important to understand, so that you know what you are doing, and it helps to debug further or whenever you come across it again!
The streaming checkpoint doesn't replay every schema change step by step. What actually happens is that the checkpoint stores the schema and the last processed version offset. When the stream advances, it reads the Delta log entries sequentially from that offset forward. If it encounters any transaction in that log that represents a non-additive change (drop, rename, type change), it stops right there with the error. It doesn't look at the transactions before or after it just refuses to move past that one incompatible entry.
2 weeks ago - last edited 2 weeks ago
Hi @Kirankumarbs
Please do reconfirm if my understanding is correct ๐
Summarizing above points:
So checkpoint stores target schema (schemaLocation) and source offset that is delta table version in this case(offsets folder). So on next microbatch run it starts reading from n+1... current version of source if any version includes changes that are non additive it stops and throws error and if it goes well, at the time of write it compares schema of df with target schema.
2 weeks ago
I am sorry, but I have to correct your statement! Especially, this is the foundation!
The checkpoint doesn't store the target schema. It stores the source schema that was in effect when the stream last ran. The offset (yes, stored in the offsets folder) tracks the last committed version from the source Delta table. So on the next micro-batch it reads from version N+1 onwardthat part you've got right.
The schema check happens on the read side, not the write side. When the stream advances through the Delta log from N+1 forward, it validates each transaction entry against the schema it has recorded. If it hits a non-additive change (drop, rename, type change) at any version in that range, it fails immediately before it even produces a DataFrame for that micro-batch. It never gets to the write stage.
The write-side schema comparison is a separate thing. That's where mergeSchema or schema evolution settings come in โ it controls whether the target table accepts new columns or structural changes in the incoming DataFrame. But the error you're seeing (DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE_USE_LOG) is purely a read-side failure. The stream can't even construct the DataFrame to write because it can't reconcile the source log history.
So the flow is:
Hope this helps! And sorry for the lengthy explanation, but I feel enabling you is more helpful than on shot answer so that you can take right call while you do migration!
2 weeks ago
I appreciate the detailed explanation as my goal is to gain a clear understanding ๐
As the delta loag of target table already captures the table metadata so the target dataframe schema is validated against this metadata correct?
I also have two follow up questions:
In case of Autoloader it throws an error on column addition due to the default schema evolution mode right whereas in delta streaming tables as source errors occur in case of non-additive schema changes could you please confirm if my understanding is correct?
And In case of message bus like kafka the offset is the consumer group offset right? can you please share any documentation on this
Thanks for being so patient with me ๐
2 weeks ago
> As the delta loag of target table already captures the table metadata so the target dataframe schema is validated against this metadata correct?
Yes!
>In case of Autoloader it throws an error on column addition due to the default schema evolution mode right whereas in delta streaming tables as source errors occur in case of non-additive schema changes could you please confirm if my understanding is correct?
Yea too!, but the reasons are different. Autoloader's default schema evolution mode (addNewColumns) deliberately stops the stream when it detects a new column in the incoming files. It does this so it can update the inferred schema at the schemaLocation before continuing. It's a controlled pause, not really an error, restart the stream and it picks up with the new column included. You can change this behavior with cloudFiles.schemaEvolutionMode if you want it to handle new columns silently.
So, Autoloader is stricter about column additions by default, Delta streaming source is stricter about column removals and renames.
> And In case of message bus like kafka the offset is the consumer group offset right?
yes, when you use Kafka as a streaming source, the offset tracked in the Spark checkpoint is essentially the consumer offset (topic, partition, offset tuple). It's not using Kafka's consumer group offset management though, Spark manages its own offsets in the checkpoint directory independently from Kafka's consumer group mechanism. So even if you have a consumer group configured, Spark ignores it for offset tracking and relies entirely on its own checkpoint.
Some documentation that can help you:
- Stream processing with Apache Kafka and Databricks
- Auto Loader Config
Happy to keep going if you have more questions.
Hope this helps! If it does, could you please mark it as "Accept as Solution"? That will help other users quickly find the correct fix.
2 weeks ago - last edited 2 weeks ago
@Kirankumarbs I have tried doing entire refresh of tables it worked thank you so much but on UC ui I still see the old column with void as datatype
and on querying the table it throws error saying incompatible schema change. could you please help with this