cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Lakeflow SDP failed with DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE_USE_LOG

IM_01
Contributor

Hi,

A column was deleted on the source table, when I ran LSDP it failed with error DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE_USE_LOG : Streaming read is not supported on tables with read-incompatible schema changes( e.g: rename or drop or datatype changes)

what is the recommended approach to handle column deletions or datatype changes on source table in Lakeflow SDP.

1 ACCEPTED SOLUTION

Accepted Solutions

SteveOstrowski
Databricks Employee
Databricks Employee

Hi IM_01,

You can set pipelines.reset.allowed as a table property directly in your pipeline definition. The approach depends on whether you are using Python or SQL:

Python:

@dlt.table(
    table_properties={"pipelines.reset.allowed": "true"}
)
def my_streaming_table():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .load("/path/to/data")
    )

SQL:

CREATE OR REFRESH STREAMING TABLE my_streaming_table
TBLPROPERTIES ("pipelines.reset.allowed" = "true")
AS SELECT * FROM STREAM(LIVE.source_table);

Setting this to "true" allows a full refresh of that specific table, which is what you need to resolve the DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE_USE_LOG error. Once you set the property and do a full refresh to pick up the schema change, you may want to set it back to "false" afterward to prevent accidental full refreshes in production.

More detail on pipeline table properties is available in the docs: Pipeline table properties.

Sources:

View solution in original post

18 REPLIES 18

Ashwin_DSA
Databricks Employee
Databricks Employee

Hi @IM_01,

This error means the upstream Delta source had a non‑additive schema change (drop/rename/type change), and SDP’s streaming read can’t continue from the existing checkpoint.

As of today, only additive changes are supported transparently in SDP. Dropping or renaming a column, or changing its type in place, is treated as a breaking change for streaming reads. The recommended fix is to run a full refresh of the affected streaming tables (or the whole pipeline) after the schema change.

For column deletions, you have two options:

  1. If you truly want it gone: stop the pipeline --> apply the DDL change --> run a full refresh --> restart incremental runs.
  2. If you want to avoid full refreshes: don’t physically drop the column. Leave it in the schema (optionally stop populating it or ignore it in your SDP logic) so the streaming schema remains compatible.

For data type changes:

  1. If it’s a widening change (for example INT --> BIGINT, FLOAT --> DOUBLE), SDP now supports type widening in the PREVIEW channel. Enable pipelines.enableTypeWidening = true (or per table) so supported type changes can be applied without a full refresh of streaming tables (materialized views will still recompute).
  2. For other/incompatible type changes, model them as “add a new column with the new type + cast” and update SDP to use that column; or treat the in‑place change as a breaking change and do a full refresh.
 

The message about schemaTrackingLocation and allowSourceColumnTypeChange comes from the underlying Delta streaming engine. In Lakeflow SDP those options aren’t currently configurable, so you can’t resolve this particular error just by adding them to your pipeline definition. Instead, you need to treat the drop/type change as a breaking change and run a full refresh of the affected streaming tables. For more background on schemaTrackingLocation and column mapping, see the docs:  https://docs.databricks.com/aws/en/delta/column-mapping.

 

If this answer resolves your question, could you mark it as “Accept as Solution”? That helps other users quickly find the correct fix.

 

Regards,
Ashwin | Delivery Solution Architect @ Databricks
Helping you build and scale the Data Intelligence Platform.
***Opinions are my own***

IM_01
Contributor

Hi   @Ashwin_DSA @SteveOstrowski  


As part of deletion of column on the target table I tried changing column mapping mode to name and then it errors out saying SET_TBLPROPERTIES_NOT_ALLOWED_FOR_PIPELINE_TABLE
Could you please help me with this

Ashwin_DSA
Databricks Employee
Databricks Employee

Hi @IM_01,

It is working as expected. In Lakeflow SDP, streaming tables and materialised views are pipeline‑managed tables, so you can’t change low‑level Delta properties (including delta.columnMapping.mode) with ALTER TABLE … SET TBLPROPERTIES. Those are reserved for the engine, not for direct user DDL.

That also means you can’t enable column mapping + schemaTrackingLocation on an SDP table yourself to work around DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE_USE_LOG, even though the generic Delta docs talk about that option.

For your scenario:

  • You dropped a column in the source. As explained before, that’s a non‑additive schema change (drop/rename/type change).
  • SDP currently treats that as a breaking change for streaming tables. The supported mitigation is to..
    • Either restore/keep the column in the source and ignore it in your logic, or
    • Run a full refresh of the affected SDP tables so they are rebuilt with the new schema and a fresh checkpoint.

Trying to toggle delta.columnMapping.mode = 'name' on the SDP target table is not supported and won’t unblock this error.

If this answer resolves your question, could you mark it as “Accept as Solution”? That helps other users quickly find the correct fix.

Regards,
Ashwin | Delivery Solution Architect @ Databricks
Helping you build and scale the Data Intelligence Platform.
***Opinions are my own***

IM_01
Contributor

Hi @Ashwin_DSA 
Could you please elaborate on ignoring it in logic Ashwin?
And After reading the source table , even though I added the deleted column and populating it with NULL in target
df=df.withColumn("delcol",lit(None))  it fails.

Ashwin_DSA
Databricks Employee
Databricks Employee

Hi @IM_01 ,

Sorry! I may have clicked the wrong button and incorrectly accepted your comment as a solution! I don't know how to revert it. I'll check that separately. 🙂 

When I said "ignore it in logic," I meant... don’t physically drop the column from the source Delta table. Instead, leave the column in the source schema and stop using it downstream (don’t select/join on it, or treat it as always NULL). That way, the streaming read still sees a stable schema and doesn’t error.

In your case, the column was already dropped from the source, so the error is raised before your transformation runs. Adding it back with.. 

df = df.withColumn("delcol", lit(None))

only changes the in‑memory dataframe after the read. It does not fix the fact that the source table’s schema changed incompatibly, so the stream still fails.

With SDP today, you essentially have two supported options:

  1. Restore the column on the source table (same name/type), then run a full refresh of the affected SDP tables so checkpoints are rebuilt.... or
  2. Leave the column dropped and run a full refresh of those SDP streaming tables so they’re rebuilt against the new schema.

Trying to work around this with column mapping / schemaTrackingLocation isn’t supported for SDP‑managed tables, which is why SET_TBLPROPERTIES fails.

Trust that clarifies. 

If this answer resolves your question, could you mark it as “Accept as Solution”? That helps other users quickly find the correct fix.

Regards,
Ashwin | Delivery Solution Architect @ Databricks
Helping you build and scale the Data Intelligence Platform.
***Opinions are my own***

IM_01
Contributor

Hi @Ashwin_DSA 

I am bit confused  even though I tried adding the column, its throwing error
could you please provide better understanding on why its throwing an error even after adding column because I was thinking it tries to match schema while writing to the target, please let me know if I am missing something

 

I have gone through this kind of issue sometime back! Adding the column back doesn't undo the schema change from the streaming engine's perspective.

When you dropped the column, Delta recorded that as a specific transaction in the table's log (say, version N). When you added it back, that's a separate transaction (version N+1). The streaming checkpoint remembers exactly which version it last processed. When it tries to advance through the log, it hits version N, then drops and fails right there. It doesn't look ahead to see that you re-added the column in N+1. It just sees a non-additive schema change at N and stops.

Think of it this way: the error isn't about the current schema of the source table. It's about the history of schema changes the stream has to replay through. The stream processes the Delta log sequentially, and it can't skip over an incompatible change even if a later transaction "fixes" it.

So even though your source table's current schema now matches what the checkpoint expects, the problematic drop transaction is still sitting in the log between the checkpoint's last processed version and now.

To get past this you'll need to do a full refresh of the affected streaming table in SDP. That resets the checkpoint entirely and starts reading from the current state of the source table with its current schema. In the pipeline UI, you can select just that specific table for a full refresh rather than refreshing everything.

Going forward, if you need to temporarily remove a column from the source, a safer approach is to create a view on top of the source table that excludes the column, and have your SDP pipeline read from that view instead. That way the underlying Delta table's schema stays stable from the stream's perspective.

Hope this helps! If it did, please mark it as “Accept as Solution” so others can easily find it.

IM_01
Contributor

Hi @Kirankumarbs 
Please let me know if my understanding is correct 🙂
So when it tries to read source table it matches the src schema in checkpoint with the current source schema and add on changes as new changes which are still not part of checkpoint schema 
Suppose If I have added a new column in initial stage then it would do 2 level of comparison
1. source schema
2. new cols
So it does comparison step by step by replaying the previous steps in the version N.

Pretty close, but let me clarify a bit as it is a little more involved and pretty important to understand, so that you know what you are doing, and it helps to debug further or whenever you come across it again!

The streaming checkpoint doesn't replay every schema change step by step. What actually happens is that the checkpoint stores the schema and the last processed version offset. When the stream advances, it reads the Delta log entries sequentially from that offset forward. If it encounters any transaction in that log that represents a non-additive change (drop, rename, type change), it stops right there with the error. It doesn't look at the transactions before or after it just refuses to move past that one incompatible entry.

IM_01
Contributor

Hi @Kirankumarbs 

Please do reconfirm if my understanding is correct 🙂
Summarizing above points:
So checkpoint stores target schema (schemaLocation) and source offset that is delta table version in this case(offsets folder). So on next microbatch run it starts reading from n+1... current version of source if any version includes changes that are non additive it stops and throws error and if it goes well, at the time of write it compares schema of df with target schema.



I am sorry, but I have to correct your statement! Especially, this is the foundation!

The checkpoint doesn't store the target schema. It stores the source schema that was in effect when the stream last ran. The offset (yes, stored in the offsets folder) tracks the last committed version from the source Delta table. So on the next micro-batch it reads from version N+1 onwardthat part you've got right.

The schema check happens on the read side, not the write side. When the stream advances through the Delta log from N+1 forward, it validates each transaction entry against the schema it has recorded. If it hits a non-additive change (drop, rename, type change) at any version in that range, it fails immediately before it even produces a DataFrame for that micro-batch. It never gets to the write stage.

The write-side schema comparison is a separate thing. That's where mergeSchema or schema evolution settings come in — it controls whether the target table accepts new columns or structural changes in the incoming DataFrame. But the error you're seeing (DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE_USE_LOG) is purely a read-side failure. The stream can't even construct the DataFrame to write because it can't reconcile the source log history.

So the flow is:

  1. Stream wakes up, reads checkpoint (source schema + last source version offset)
  2. Reads Delta log from N+1 to current version
  3. If any entry has a non-additive schema change -> fails here, never reaches write
  4. If all entries are compatible -> constructs DataFrame with data from those versions
  5. At write time -> compares DataFrame schema with target table schema (this is where mergeSchema matters)

Hope this helps! And sorry for the lengthy explanation, but I feel enabling you is more helpful than on shot answer so that you can take right call while you do migration!

IM_01
Contributor

I appreciate the detailed explanation as my goal is to gain a clear understanding 🙂
As the delta loag of target table already captures the table metadata so the target dataframe schema is validated against this metadata correct?

I also have two follow up questions:
In case of Autoloader it throws an error on column addition due to the default schema evolution mode right whereas in delta streaming tables as source errors occur in case of non-additive schema changes could you please confirm if my understanding is correct?
And In case of message bus like kafka the offset is the consumer group offset right? can you please share any documentation on this
Thanks for being so patient with me 🙂

As the delta loag of target table already captures the table metadata so the target dataframe schema is validated against this metadata correct?
Yes!

>In case of Autoloader it throws an error on column addition due to the default schema evolution mode right whereas in delta streaming tables as source errors occur in case of non-additive schema changes could you please confirm if my understanding is correct?
Yea too!, but the reasons are different. Autoloader's default schema evolution mode (addNewColumns) deliberately stops the stream when it detects a new column in the incoming files. It does this so it can update the inferred schema at the schemaLocation before continuing. It's a controlled pause, not really an error, restart the stream and it picks up with the new column included. You can change this behavior with cloudFiles.schemaEvolutionMode if you want it to handle new columns silently.

So, Autoloader is stricter about column additions by default, Delta streaming source is stricter about column removals and renames.

And In case of message bus like kafka the offset is the consumer group offset right?
yes, when you use Kafka as a streaming source, the offset tracked in the Spark checkpoint is essentially the consumer offset (topic, partition, offset tuple). It's not using Kafka's consumer group offset management though, Spark manages its own offsets in the checkpoint directory independently from Kafka's consumer group mechanism. So even if you have a consumer group configured, Spark ignores it for offset tracking and relies entirely on its own checkpoint.

Some documentation that can help you:
Stream processing with Apache Kafka and Databricks
- Auto Loader Config

Happy to keep going if you have more questions.

Hope this helps! If it does, could you please mark it as "Accept as Solution"? That will help other users quickly find the correct fix.

IM_01
Contributor

@Kirankumarbs I have tried doing entire refresh of tables it worked thank you so much but on UC ui I still see the old column with void as datatype
and on querying the table it throws error saying incompatible schema change. could you please help with this