Hi,
I have a Structured Stream which reads data from my silver layer and creates a gold layer using foreachBatch. The stream has been working fine, but now I have change where there are deletions to the schema and some of the columns from the silver layer are deleted. When I try to run the stream I am facing a schema mismatch error. Is it possible for me to handle such schema changes(especially deletions) without changing the checkpoint location?
eg:
def goldStream():
query = (spark.readStream.format("delta")
.option("maxFilesPerTrigger", 1)
.option("mergeSchema", "true")
.table("`ctg_dev`.demo.load_data_to_silver")
.writeStream
.option("checkpointLocation", "dbfs:/Volumes/ctg_dev/demo/managedvolume")
.foreachBatch(transform_silver)
.trigger(availableNow=True)
.start())
query.awaitTermination()
Error:
Please try restarting the query. If this issue repeats across query restarts without making progress, you have made an incompatible schema change and need to start your query from scratch using a new checkpoint directory.