โ10-11-2024 12:21 AM
Hey Community,
we have a streaming pipeline, starting with autoloader to ingest data into a bronze table and this data gets then picked up by another streaming job that transforms this data and writes into a silver table.
Now there are some schema changes (renaming a column and adding a new column) on the bronze table that we also want to propagate to the silver table.
We added the "schemaTrackingLocation" option to the stream that ingests the data from bronze such that the it does not fail because of these non-additive schema changes but the stream now also does not pick up the schema changes at all. The schema that I get from the readstream operation is still the same as before the schema change in bronze.
Does anyone know how to best handle such schema changes using structured streaming without too much downtime of the stream and too much overhead?
Thank you!
โ10-13-2024 11:27 AM
Hi @Volker,
How are you doing today?
As per my understanding, Just try using the mergeSchema option in your streaming read operation to ensure the stream picks up schema changes from the Bronze table automatically. This will help propagate the new schema changes, like column renaming or additions, to the Silver table. Additionally, make sure that the schema evolution is enabled for both the Bronze and Silver tables to accommodate these changes without restarting the stream. If you're dealing with column renames, you may need to implement manual handling for those to ensure the downstream transformations are consistent. Lastly, minimize downtime by using checkpointing, which ensures the stream continues where it left off after applying schema changes.
Give a try and let me know.
Regards,
Brahma
โ10-14-2024 12:32 AM
Hi @Brahmareddy,
thanks for your reply! Using mergeSchema still requires me to use a new checkpoint for the Stream or? Then I could restart the stream using a new checkpoint and provide a starting version.
โ10-14-2024 01:57 AM
I feel like
.option("schemaTrackingLocation", checkpoint_path)
should do the trick but if I add this option and alter the source table schema, the schema that the stream reads is still the old one before the schema adaptions and, thus, I cannot propagate the schema changes from bronze to silver.
Could it be that I'm doing something wrong or missing some additional option?
โ10-14-2024 02:59 AM
@Volker Make sure that you also enable schema evolution in your write stream operation. You can do this by adding the mergeSchema option when writing to the silver table. This allows the destination table to automatically adapt to the new schema.
silver_df.writeStream \
.format("delta") \
.option("mergeSchema", "true") \
.option("checkpointLocation", checkpoint_path) \
.outputMode("append") \
.table("silver_table")
โ10-16-2024 04:19 AM
I adjusted the schema in both bronze and silver, such that I do not need schema evolution. The problem is that the DataStreamReader does not pick up the schema changes in bronze.
I already figured out that it has something to do with providing also a starting_version since I tested it with and without starting_version and when I provided a starting version the stream did not pick up the schema changes and if omit the starting version then the stream does pick up the changes.
Now I'm a little confused since starting_version should only concern the stream when the query gets started and then should be ignored according to the documentation.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group