cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Structured Streaming schemaTrackingLocation does not work with starting_version

Volker
New Contributor III

Hello Community,

I came across a strange behviour when using structured streaming on top of a delta table. 

I have a stream that I wanted to start from a specific version  of a delta table using the option option(

"starting_version", x) because I did not want to stream all the data of source the table but only the newly arriving one. To accomodate future (non-additive) schema changes I also set the option option("schemaTrackingLocation", checkpoint_location). 
Now, if I change the schema of the source table the DataStreamReader does not pick up the schema changes and writes these to the schemaTrackingLocation but still infers the old schema and I can't get it to pick up the schema changes.
After some trial and error I found out that the starting_version is probably the cause of the issue since I tried changing the schema on a stream without setting the starting_version option and it worked as intended and could pick up the schema changes on the source table.
I'm a bit confused since the starting_version should only have an effect when starting the stream and otherwise be ignored, as from the docs: 
They take effect only when starting a new streaming query. If a streaming query has started and the progress has been recorded in its checkpoint, these options are ignored. https://docs.databricks.com/en/structured-streaming/delta-lake.html#specify-initial-position
Did anybody have a similar problem? Is this an intended behaviour? How can I solve this issue? Where could I raise this issue?  
1 REPLY 1

Volker
New Contributor III

I found that it actually is not related to specifying the starting_version.

I think I found the flaw in the flow how the schema is updated in the schemaTrackingLocation:

  • On the first readStream operation the _schema_log_... gets created
  • On the first writeStream operation the schema gets written to the _schema_log_
  • readStream will now read in the source table with the schema from _schema_log_
  • the schema in _schema_log_ only gets updated on a writeStream operation if there is a schema change detected in the source table
  • The check if the source table schema was updated happens after checking if the data schema and the target schema are compatible
  • If the source table schema and target table schema get updated simultaneously then the stream fails since it detects a schema mismatch between the data schema (which is the original schema) and the target table schema
  • Because the stream fails at this point the schema in the _schema_log_ does not get updated as well and the readStream will always only read in the original schema of the source table even though the schema changed.

This is quite annoying behaviour because of this you would first need to adapt the schema of the source table, let the stream fail since it detected a schema change (which will cause an update of the schema in the _schema_log_) and then update the schema of the target table. 
I know that I could use schema evolution but I do not want to use it if possible. 
Does anybody have experience with this and has a workaround?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group