Databricks Autoloader Schema Evolution throws StateSchemaNotCompatible exception
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-23-2023 01:46 AM - edited 11-23-2023 01:49 AM
I am trying to use Databricks Autoloader for a very simple use case:
Reading JSONs from S3 and loading them into a delta table, with schema inference and evolution.
This is my code:
self.spark \ .readStream \ .format("cloudFiles") \ .option("cloudFiles.format", "json") \ .option("cloudFiles.inferColumnTypes", "true") \ .option("cloudFiles.schemaLocation", f"{self.target_s3_bucket}/_schema/{source_table_name}") \ .load(f"{self.source_s3_bucket}/{source_table_name}") \ .distinct() \ .writeStream \ .trigger(availableNow=True) \ .format("delta") \ .option("mergeSchema", "true") \ .option("checkpointLocation", f"{self.target_s3_bucket}/_checkpoint/{source_table_name}") \ .option("streamName", source_table_name) \ .start(f"{self.target_s3_bucket}/{target_table_name}")
When a JSON with an unknown column arrives, the Stream fails, as expected, with a NEW_FIELDS_IN_RECORD_WITH_FILE_PATH exception.
But when I retry the job, I get the following exception:
StateSchemaNotCompatible: Provided schema doesn't match to the schema for existing state! Please note that Spark allow difference of field name: check count of fields and data type of each field.
This is my first time using Autoloader, am I doing something obviously wrong?
I've posted this already to StackOverflow and got some answers that were not that helpful though:
- Labels:
-
Spark
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-23-2023 07:01 AM
Hey @Retired_mod ,
Thank you for the answer.
- Clearing checkpoint data is, unfortunately, not an option. The Stream would reprocess all the data again, and this is not what I want since the Stream is running incrementally.
- Manual schema declaration is also not an option since I want to add new columns.
What confuses me is that the StateSchemaNotCompatible exception is emitted from Spark Structured Streaming and is not an AutoLoader exception.
When I add a new column to the base table, the Stream fails with the NEW_FIELDS_IN_RECORD_WITH_FILE_PATH exception, which is expected when specifying addNewColumns.
When I restart the Stream, it fails with StateSchemaNotCompatible, which shouldn't be the case since the schema should be updated as soon as AutoLoader fails with the NEW_FIELDS_IN_RECORD_WITH_FILE_PATH exception.
My use case seems to be straightforward. I can not imagine that I am the only one that tries to run AutoLoader with:
- Structured Streaming
- JSON files as source
- Column Type Inference
- Automated Schema Evolution
- Delta as the target
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-12-2024 06:39 AM
@robertkoss I have the exact same problem... have you found a solution ?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-12-2024 07:37 AM
Hey, the problem is
distinct()
because it requires a state.