cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Structured Streaming schemaTrackingLocation does not work with starting_version

Volker
Contributor

Hello Community,

I came across a strange behviour when using structured streaming on top of a delta table. 

I have a stream that I wanted to start from a specific version  of a delta table using the option option(

"starting_version", x) because I did not want to stream all the data of source the table but only the newly arriving one. To accomodate future (non-additive) schema changes I also set the option option("schemaTrackingLocation", checkpoint_location). 
Now, if I change the schema of the source table the DataStreamReader does not pick up the schema changes and writes these to the schemaTrackingLocation but still infers the old schema and I can't get it to pick up the schema changes.
After some trial and error I found out that the starting_version is probably the cause of the issue since I tried changing the schema on a stream without setting the starting_version option and it worked as intended and could pick up the schema changes on the source table.
I'm a bit confused since the starting_version should only have an effect when starting the stream and otherwise be ignored, as from the docs: 
They take effect only when starting a new streaming query. If a streaming query has started and the progress has been recorded in its checkpoint, these options are ignored. https://docs.databricks.com/en/structured-streaming/delta-lake.html#specify-initial-position
Did anybody have a similar problem? Is this an intended behaviour? How can I solve this issue? Where could I raise this issue?  
2 REPLIES 2

Volker
Contributor

I found that it actually is not related to specifying the starting_version.

I think I found the flaw in the flow how the schema is updated in the schemaTrackingLocation:

  • On the first readStream operation the _schema_log_... gets created
  • On the first writeStream operation the schema gets written to the _schema_log_
  • readStream will now read in the source table with the schema from _schema_log_
  • the schema in _schema_log_ only gets updated on a writeStream operation if there is a schema change detected in the source table
  • The check if the source table schema was updated happens after checking if the data schema and the target schema are compatible
  • If the source table schema and target table schema get updated simultaneously then the stream fails since it detects a schema mismatch between the data schema (which is the original schema) and the target table schema
  • Because the stream fails at this point the schema in the _schema_log_ does not get updated as well and the readStream will always only read in the original schema of the source table even though the schema changed.

This is quite annoying behaviour because of this you would first need to adapt the schema of the source table, let the stream fail since it detected a schema change (which will cause an update of the schema in the _schema_log_) and then update the schema of the target table. 
I know that I could use schema evolution but I do not want to use it if possible. 
Does anybody have experience with this and has a workaround?

mark_ott
Databricks Employee
Databricks Employee

This issue is related to how Delta Lakeโ€™s structured streaming interacts with schema evolution and options like startingVersion and schemaTrackingLocation. The behavior you've observed has been noted by other users, and can be subtle due to how checkpointing, versioning, and schema tracking are handled in combination. Hereโ€™s a breakdown, with solutions:

Core Issue

Setting startingVersion as an option in your stream appears to interfere with schema evolution, resulting in the stream persisting the old schemaโ€”even after the underlying Delta tableโ€™s schema has changed and updates are written to your schemaTrackingLocation.

When you remove startingVersion, the DataStreamReader detects schema changes correctly, provided schema tracking is enabled. From Databricks documentation, startingVersion should only be relevant for initializing a new stream, not for resumes from an existing checkpoint.

Why Does This Happen?

  • Schema Tracking and startingVersion:

    • When startingVersion is set, it can impact which version of the table the streaming query starts reading fromโ€”even if a checkpoint exists. Certain system versions and Spark releases may not fully disregard this option after checkpoint initialization due to nuanced implementation details behind the scenes.

    • The schema stored at schemaTrackingLocation is used for schema management, but if the stream is โ€œstuckโ€ at an older version due to how startingVersion is interpreted, it may not trigger schema updates.

  • Checkpoints and Restart Behavior:

    • On restart, if a checkpoint exists, the stream should ignore startingVersion. However, if the checkpoint is missing or corrupted, or if the startingVersion option is reapplied incorrectly, the schema may not evolve as expected.

Suggested Solution

  • Remove startingVersion After Initial Start:

    • Only use the startingVersion option when you first start the stream and no checkpoint exists.

    • After initial startup and successful checkpointing, remove startingVersion so schema tracking works properly on subsequent runs. Schema changes should then be detected and handled via your schemaTrackingLocation.

  • Confirm Checkpoint Health:

    • Make sure your checkpoint directory is healthy and present when restarting the stream. If the checkpoint is not present, startingVersion will be used.

  • Upgrade Databricks & Delta Lake:

    • Certain bugs with schema tracking and stream options have been resolved in later versions of Databricks and Delta Lake. Upgrading may resolve unexpected behaviors.

Workaround (if you need to retain startingVersion logic):

  • Start your stream without the startingVersion once the checkpoint is established, so ongoing runs see schema changes.

  • For testing, you can clear out your checkpoint directory (careful: this resets your offsets and may replay data) then set startingVersion to reinitialize from that version, but be sure to understand the replay implications.

References

Summary

This is not fully โ€œintendedโ€ behavior, but more a side-effect of how options and checkpointing interact in specific tool versions. Removing startingVersion after initial setup, maintaining your checkpoint, and enabling schema tracking is the correct pattern for evolving schemas in Delta Lake structured streaming. If the problem persists after following this approach and upgrading, it may warrant a support ticket or GitHub issue.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now