cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Schema Deletion -Structured Streaming

Dp15
New Contributor III

Hi,
I have a Structured Stream which reads data from my silver layer and creates a gold layer using foreachBatch. The stream has been working fine, but now I have change where there are deletions to the schema and some of the columns from the silver layer are deleted. When I try to run the stream I am facing a schema mismatch error. Is it possible for me to handle such schema changes(especially deletions) without changing the checkpoint location? 

eg:

def goldStream():
    query = (spark.readStream.format("delta")
            .option("maxFilesPerTrigger", 1)
            .option("mergeSchema", "true")
            .table("`ctg_dev`.demo.load_data_to_silver")
            .writeStream
            .option("checkpointLocation",  "dbfs:/Volumes/ctg_dev/demo/managedvolume")
            .foreachBatch(transform_silver)          
            .trigger(availableNow=True)
            .start())
    query.awaitTermination()

Error:
Please try restarting the query. If this issue repeats across query restarts without making progress, you have made an incompatible schema change and need to start your query from scratch using a new checkpoint directory.

 

 

1 ACCEPTED SOLUTION

Accepted Solutions

Kaniz
Community Manager
Community Manager

Hi @Dp15, Handling schema changes in Structured Streaming can be tricky, especially when dealing with deletions or other modifications.

 

Let’s explore some strategies to address this issue:

 

Schema Evolution:

  • Schema evolution allows you to handle changes in the schema of your streaming data. It enables you to read data with a new schema while still processing data with the old schema.
  • When using Delta Lake, you can set the mergeSchema option to true (as you’ve already done in your example). This ensures that new columns are added to the schema and updated with compatible data types.
  • However, deletions are not directly supported by schema evolution. If columns are deleted from the silver layer, it can lead to a schema mismatch error.

Checkpointing and Schema Changes:

  • The error message you encountered suggests that the schema change is incompatible with the existing checkpoint directory.
  • Checkpointing is essential for fault tolerance and exact-once processing in Structured Streaming. It relies on the schema information stored in the checkpoint directory.
  • When there are schema changes (including deletions), the checkpoint directory becomes incompatible, and you need to start from scratch with a new checkpoint location.
  • Unfortunately, there’s no direct way to handle schema deletions without changing the checkpoint location.

Workaround:

  • While you cannot avoid changing the checkpoint location, you can minimize the impact:
    • Create a New Checkpoint Directory: Start your streaming query with a new checkpoint directory. This ensures compatibility with the updated schema.
    • Migrate Existing Data: If possible, migrate the existing data from the old gold layer to the new one. You can use batch processing or other methods to achieve this.
    • Update Consumers: If downstream consumers rely on the gold layer, update them to use the new checkpoint location and schema.

Considerations:

  • Remember that schema changes (especially deletions) can affect the correctness of your processing. Ensure thorough testing after making such changes.
  • Backward compatibility (reading new data with old schema) is more accessible than forward compatibility (reading old data with new schema).

In summary, while changing the checkpoint location, consider migrating existing data and updating downstream consumers to minimize disruptions caused by schema changes. Remember to test thoroughly to ensure correctness. 🚀

View solution in original post

2 REPLIES 2

Kaniz
Community Manager
Community Manager

Hi @Dp15, Handling schema changes in Structured Streaming can be tricky, especially when dealing with deletions or other modifications.

 

Let’s explore some strategies to address this issue:

 

Schema Evolution:

  • Schema evolution allows you to handle changes in the schema of your streaming data. It enables you to read data with a new schema while still processing data with the old schema.
  • When using Delta Lake, you can set the mergeSchema option to true (as you’ve already done in your example). This ensures that new columns are added to the schema and updated with compatible data types.
  • However, deletions are not directly supported by schema evolution. If columns are deleted from the silver layer, it can lead to a schema mismatch error.

Checkpointing and Schema Changes:

  • The error message you encountered suggests that the schema change is incompatible with the existing checkpoint directory.
  • Checkpointing is essential for fault tolerance and exact-once processing in Structured Streaming. It relies on the schema information stored in the checkpoint directory.
  • When there are schema changes (including deletions), the checkpoint directory becomes incompatible, and you need to start from scratch with a new checkpoint location.
  • Unfortunately, there’s no direct way to handle schema deletions without changing the checkpoint location.

Workaround:

  • While you cannot avoid changing the checkpoint location, you can minimize the impact:
    • Create a New Checkpoint Directory: Start your streaming query with a new checkpoint directory. This ensures compatibility with the updated schema.
    • Migrate Existing Data: If possible, migrate the existing data from the old gold layer to the new one. You can use batch processing or other methods to achieve this.
    • Update Consumers: If downstream consumers rely on the gold layer, update them to use the new checkpoint location and schema.

Considerations:

  • Remember that schema changes (especially deletions) can affect the correctness of your processing. Ensure thorough testing after making such changes.
  • Backward compatibility (reading new data with old schema) is more accessible than forward compatibility (reading old data with new schema).

In summary, while changing the checkpoint location, consider migrating existing data and updating downstream consumers to minimize disruptions caused by schema changes. Remember to test thoroughly to ensure correctness. 🚀

Dp15
New Contributor III

@Kaniz Thank you so much for a detailed explanation

 

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.