2 weeks ago
Hello,
Bronze: use classic or job compute, Autoloader with.option("mergeSchema", "true"). Schema evolution works correctly. data goes to bronze.my_bronze_table.
Silver: uses serverless compute, reader reads bronze.my_bronze_table, does all necessary transformation, writer creates silver.my_silver_table, which has defined schema.
The problem I am trying to resolve:
whenever the schema of bronze.my_bronze_table is changed (thanks to schema evolution), the writer fails to write silver.my_silver_table due to [STATE_STORE_VALUE_SCHEMA_NOT_COMPATIBLE] The provided value schema does not match existing schema in operator state.
The detailed debug log says clearly that the mismatch is in the schema of bronze.my_bronze_table.
As the silver uses serverless compute, I can not set
2 weeks ago
Hi @cdn_yyz_yul,
Because the silver stream runs on serverless, you can’t relax state-store schema checks or set custom Spark configs. When the upstream bronze table schema evolves in a way that changes the schema of any stateful operator, the streaming query will correctly fail with STATE_STORE_VALUE_SCHEMA_NOT_COMPATIBLE.
On serverless, the supported pattern is to treat this as a breaking change:
This is expected behaviour. The documentation here lists the Spark properties you can configure on Serverless. It also says the below...
You can also refer to this page, which lists Serverless's limitations. It'll again take you to the same page mentioned above.
If you want to avoid rebuilding state on every breaking schema change, you can run the silver stream on classic/job compute instead of serverless, where you can control Spark configs and design more flexible state-handling. However, I appreciate that it may not be what you are looking for.
If this answer resolves your question, could you mark it as “Accept as Solution”? That helps other users quickly find the correct fix.
2 weeks ago
severless compute does not support
2 weeks ago
Hi @cdn_yyz_yul,
Because the silver stream runs on serverless, you can’t relax state-store schema checks or set custom Spark configs. When the upstream bronze table schema evolves in a way that changes the schema of any stateful operator, the streaming query will correctly fail with STATE_STORE_VALUE_SCHEMA_NOT_COMPATIBLE.
On serverless, the supported pattern is to treat this as a breaking change:
This is expected behaviour. The documentation here lists the Spark properties you can configure on Serverless. It also says the below...
You can also refer to this page, which lists Serverless's limitations. It'll again take you to the same page mentioned above.
If you want to avoid rebuilding state on every breaking schema change, you can run the silver stream on classic/job compute instead of serverless, where you can control Spark configs and design more flexible state-handling. However, I appreciate that it may not be what you are looking for.
If this answer resolves your question, could you mark it as “Accept as Solution”? That helps other users quickly find the correct fix.
2 weeks ago
Thanks @Ashwin_DSA
I started to test: changing the bronze layer from addNewColumns to rescure (schemaEvolutionMode) while still keeping the format setting of autoloader as.csv. Then processing the _rescued data at silver.
Or, the last resort, having autoloader to use format text, then processing and extract columns at silver layer.
I will need to do some testing to finalize which method is simpler.
2 weeks ago
Hi @cdn_yyz_yul,
Great. Both of the approaches you’re testing are reasonable ways to protect your stateful silver stream from upstream schema changes on serverless...
On the first option, with schemaEvolutionMode = "rescue" on CSV... you can keep Auto Loader in CSV but switch to..
.option("cloudFiles.schemaEvolutionMode", "rescue")
.option("rescuedDataColumn", "_rescued_data")
In silver, you first do stateless parsing/flattening of _rescued_data into whatever extra fields you need and feed only a stable, fixed projection of columns into the stateful part of the query (aggregations/joins, etc.).
As long as the columns that participate in stateful operations don’t change schema, the state-store schema stays compatible, and you avoid STATE_STORE_*_SCHEMA_NOT_COMPATIBLE without needing any forbidden Spark confs on serverless.
The other option, where you are talking about keeping the format as text and parsing at silver, this is a more extreme version of the same idea. It does give you maximum isolation from source changes, but you lose CSV parsing at bronze and push more work into silver. You’d still want the same pattern... as in parse in a stateless layer, then project into a stable set of columns before any stateful operators.
Something to remember here is that even with these patterns, any intentional change to the schema of your stateful part in Silver (for example, adding new grouping keys or changing types used in aggregations/joins) is still a breaking state change and requires starting that stream with a new checkpoint location. That’s independent of serverless. It’s how Structured Streaming state recovery works.
Hope this helps.
2 weeks ago
The first option, setting the bronze table to use rescue.
In silver layer,
2 weeks ago
Hi @cdn_yyz_yul,
In your new version of the silver stream, you’ve added extra transformations (parsing _rescued_data, join, unpivot). Even though final_silver_df has the same columns as before, this changes the internal schema of at least one stateful operator compared to what’s stored in the checkpoint. As per the structured streaming docs, any change to stateful operations (agg, dedupe, stream‑stream joins, etc.) between restarts from the same checkpoint is unsupported and will fail with a state schema compatibility error. That’s exactly what you’re seeing.
Because you’re on serverless, there’s no supported way to relax this check with Spark configs. The supported migration path is:
After this one‑time reset, subsequent upstream schema changes in bronze will be absorbed by _rescued_data, and your stateful part won’t need to change, so you won’t hit the state schema error again.
Try this first... print the schema of final_silver_df and compare it to DESCRIBE TABLE silver.my_silver_table. If they match, the error is definitely about the state, not the table schema. You can also temporarily run the updated query with the same write path but a fresh checkpoint dir to confirm it starts and writes successfully. That’s a quick proof that the mismatch is in the checkpointed state.
Hope this helps.
a week ago
Thanks @Ashwin_DSA
Continue the topic, given the context discussed, when there is stateful operations in silver, the mismatch is definitely in the checkpointed state, not in table schema.
I further tested it by removing the join (which is the single stateful operations) from silver transformation, keeping the _rescued_data handling and unpivot as-is,
stream writer at silver writes correctly the dalta table with the same checkpoint location.
Summary of my test is:
- bronze: reads first buntch of .csv, which has schema1. (set to use _rescued_data)
- silver: reads from bronze, transform including unpivot, write to silver.mytable using checkpoint location mychechpoint
- bronze: put more .csv to the source location, these new .csv has extra columns.
- bronze: reads the sencond buntch of .csv , new columns are saved to _rescued_data by auto loader
- silver: run the same code as above. -- reads, transforms (including unpivot), write to silver.mytable using checkpoint location mychechpoint
- verify the new rows in silver.mytable.
=== This is to confirm the cause of the mismatch, with testing.
In production, the join is to introduce meaningful identifier, i.e., primary key, to rows in .csv. I can not remove it.
Now, what are the recommended design for such scenarios?
You mentioned:
1) Deploy the updated silver job (with _rescued_data handling, join, unpivot).
2) Run it with a new checkpointLocation so it rebuilds its state from scratch.
This is essentially the same as using cloudFiles.schemaEvolutionMode = addNewColumns, in the sense that a new checkpointLocation is required.
Given this behavior, I would consider using addNewColumns is simpler since I do not have to handing _reascued_data.
3)Keep the subset of columns that participate in stateful ops stable going forward.
Handle future schema drift only in a stateless part of the pipeline
(e.g., parse _rescued_data, unpivot, then project back to the same fixed set of stateful columns before any aggregations/joins).
In production, the stateful operation (join) can not be removed.
Summary of my understanding:
- bronze: let auto loader infer and evolve schema. -- The job will fail when autoloader finds new columns, rerun will be fine
- silver: job will fail due to checkpointed state mis-match, --> remove existing checkpoint, (optionally remove the delta), rerun.
This is what we do currently. I am hoping to find a way so the "remove existing checkpoint" could be avoid.
But, after our discussion and my testing, it seems what we are doing is the most pragmatic solution.
Any other suggestions would be appreciated. I am interested in how schema changes should be handled when using structured streaming.
P.S. regarding handling/parsing _rescued_data:
trying to dynamically adding new columns reliably requires work.
a week ago
Hi @cdn_yyz_yul,
Your experiment confirms the key point that with only _rescued_data handling + unpivot, the silver stream is effectively stateless, so reusing the same checkpoint works fine, even as bronze evolves. As soon as you add the join, you introduce state, and when the upstream change affects the join’s input schema, the state-store schema no longer matches what’s stored under the existing checkpoint. The restart from that checkpoint then correctly fails with a state schema mismatch, even though the final DataFrame/table schema looks unchanged.
On serverless, you also can’t bypass these checks using Spark configurations (no spark.sql.streaming.stateStore.*, no delta auto‑merge configs), so there isn’t a supported way to automatically evolve the state while keeping the same checkpoint. I think what you’re doing today is therefore the pragmatic, supported pattern...
And to your specific question about using _rescued_data vs. addNewColumns, they are equivalent for the checkpoint behaviour in silver layer. The reasons you might still choose one over the other are table‑design/downstream concerns, not streaming‑state concerns.
From my perspective, addNewColumns approach means simpler code, no JSON parsing. Howeverm your bronze (and possibly silver) schema keeps growing wider as every new field becomes a real column.
If you use rescue + _rescued_data, it keeps the physical schema more stable and captures unexpected fields and type mismatches into one semi‑structured column. But you pay with extra parsing logic if you want to promote some of those rescued fields.
So if your main goal is to minimise implementation complexity, and you’re already accepting that a new silver checkpoint is needed on those breaking changes, then preferring schemaEvolutionMode = "addNewColumns" is a perfectly reasonable and simpler choice.
Hope this helps.