cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks Autoloader Schema Evolution throws StateSchemaNotCompatible exception

robertkoss
New Contributor III

I am trying to use Databricks Autoloader for a very simple use case:

Reading JSONs from S3 and loading them into a delta table, with schema inference and evolution.

This is my code:

self.spark \
      .readStream \
      .format("cloudFiles") \
      .option("cloudFiles.format", "json") \
      .option("cloudFiles.inferColumnTypes", "true") \
      .option("cloudFiles.schemaLocation", f"{self.target_s3_bucket}/_schema/{source_table_name}") \
      .load(f"{self.source_s3_bucket}/{source_table_name}") \
      .distinct() \
      .writeStream \
      .trigger(availableNow=True) \
      .format("delta") \
      .option("mergeSchema", "true") \
      .option("checkpointLocation", f"{self.target_s3_bucket}/_checkpoint/{source_table_name}") \
      .option("streamName", source_table_name) \
      .start(f"{self.target_s3_bucket}/{target_table_name}")

When a JSON with an unknown column arrives, the Stream fails, as expected, with a NEW_FIELDS_IN_RECORD_WITH_FILE_PATH exception.

But when I retry the job, I get the following exception:

StateSchemaNotCompatible: Provided schema doesn't match to the schema for existing state! Please note that Spark allow difference of field name: check count of fields and data type of each field.

This is my first time using Autoloader, am I doing something obviously wrong?

I've posted this already to StackOverflow and got some answers that were not that helpful though: 

https://stackoverflow.com/questions/77482302/databricks-autoloader-schema-evolution-throws-statesche...

2 REPLIES 2

Kaniz_Fatma
Community Manager
Community Manager

Hi @robertkoss, Let’s dive into the intricacies of Databricks Autoloader and tackle the StateSchemaNotCompatible exception you’re encountering.

 

Schema Evolution and Autoloader:

  • Autoloader is designed to handle schema evolution by updating the schema when new columns are detected.
  • However, the StateSchemaNotCompatible exception typically occurs when there’s a mismatch between the inferred schema in the current stream and the schema stored in the checkpoint or state store.
  • Upon retrying the job, Spark compares the new inferred schema with the old schema stored in the checkpoint, and inconsistencies may arise.
  • Schema evolution with addNewColumns (illustrated here) is expected to update the schema with new columns as they are detected.
  • If the checkpoint data isn’t in sync with these updates, it might lead to the StateSchemaNotCompatible issue.

Possible Solutions:

Keep Experimenting:

  • As this is your first time using Autoloader, don’t worry—you’re not alone! Schema evolution can be tricky, especially when dealing with dynamic data.
  • Experiment with the suggestions above, and feel free to iterate. Sometimes, a small tweak can make a big difference.

Remember, even in the world of data engineering, a little schema evolution can lead to great insights! 🌟 

 

If you have any more questions or need further assistance, feel free to ask.

Hey @Kaniz_Fatma ,

Thank you for the answer. 

  • Clearing checkpoint data is, unfortunately, not an option. The Stream would reprocess all the data again, and this is not what I want since the Stream is running incrementally.
  • Manual schema declaration is also not an option since I want to add new columns.

What confuses me is that the StateSchemaNotCompatible exception is emitted from Spark Structured Streaming and is not an AutoLoader exception. 

When I add a new column to the base table, the Stream fails with the NEW_FIELDS_IN_RECORD_WITH_FILE_PATH exception, which is expected when specifying addNewColumns.

When I restart the Stream, it fails with StateSchemaNotCompatible, which shouldn't be the case since the schema should be updated as soon as AutoLoader fails with the NEW_FIELDS_IN_RECORD_WITH_FILE_PATH exception.

My use case seems to be straightforward. I can not imagine that I am the only one that tries to run AutoLoader with:

  • Structured Streaming
  • JSON files as source
  • Column Type Inference
  • Automated Schema Evolution
  • Delta as the target

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group