cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks Autoloader Schema Evolution throws StateSchemaNotCompatible exception

robertkoss
New Contributor II

I am trying to use Databricks Autoloader for a very simple use case:

Reading JSONs from S3 and loading them into a delta table, with schema inference and evolution.

This is my code:

self.spark \
      .readStream \
      .format("cloudFiles") \
      .option("cloudFiles.format", "json") \
      .option("cloudFiles.inferColumnTypes", "true") \
      .option("cloudFiles.schemaLocation", f"{self.target_s3_bucket}/_schema/{source_table_name}") \
      .load(f"{self.source_s3_bucket}/{source_table_name}") \
      .distinct() \
      .writeStream \
      .trigger(availableNow=True) \
      .format("delta") \
      .option("mergeSchema", "true") \
      .option("checkpointLocation", f"{self.target_s3_bucket}/_checkpoint/{source_table_name}") \
      .option("streamName", source_table_name) \
      .start(f"{self.target_s3_bucket}/{target_table_name}")

When a JSON with an unknown column arrives, the Stream fails, as expected, with a NEW_FIELDS_IN_RECORD_WITH_FILE_PATH exception.

But when I retry the job, I get the following exception:

StateSchemaNotCompatible: Provided schema doesn't match to the schema for existing state! Please note that Spark allow difference of field name: check count of fields and data type of each field.

This is my first time using Autoloader, am I doing something obviously wrong?

I've posted this already to StackOverflow and got some answers that were not that helpful though: 

https://stackoverflow.com/questions/77482302/databricks-autoloader-schema-evolution-throws-statesche...

2 REPLIES 2

Kaniz
Community Manager
Community Manager

Hi @robertkoss, Let’s dive into the intricacies of Databricks Autoloader and tackle the StateSchemaNotCompatible exception you’re encountering.

 

Schema Evolution and Autoloader:

  • Autoloader is designed to handle schema evolution by updating the schema when new columns are detected.
  • However, the StateSchemaNotCompatible exception typically occurs when there’s a mismatch between the inferred schema in the current stream and the schema stored in the checkpoint or state store.
  • Upon retrying the job, Spark compares the new inferred schema with the old schema stored in the checkpoint, and inconsistencies may arise.
  • Schema evolution with addNewColumns (illustrated here) is expected to update the schema with new columns as they are detected.
  • If the checkpoint data isn’t in sync with these updates, it might lead to the StateSchemaNotCompatible issue.

Possible Solutions:

Keep Experimenting:

  • As this is your first time using Autoloader, don’t worry—you’re not alone! Schema evolution can be tricky, especially when dealing with dynamic data.
  • Experiment with the suggestions above, and feel free to iterate. Sometimes, a small tweak can make a big difference.

Remember, even in the world of data engineering, a little schema evolution can lead to great insights! 🌟 

 

If you have any more questions or need further assistance, feel free to ask.

robertkoss
New Contributor II

Hey @Kaniz ,

Thank you for the answer. 

  • Clearing checkpoint data is, unfortunately, not an option. The Stream would reprocess all the data again, and this is not what I want since the Stream is running incrementally.
  • Manual schema declaration is also not an option since I want to add new columns.

What confuses me is that the StateSchemaNotCompatible exception is emitted from Spark Structured Streaming and is not an AutoLoader exception. 

When I add a new column to the base table, the Stream fails with the NEW_FIELDS_IN_RECORD_WITH_FILE_PATH exception, which is expected when specifying addNewColumns.

When I restart the Stream, it fails with StateSchemaNotCompatible, which shouldn't be the case since the schema should be updated as soon as AutoLoader fails with the NEW_FIELDS_IN_RECORD_WITH_FILE_PATH exception.

My use case seems to be straightforward. I can not imagine that I am the only one that tries to run AutoLoader with:

  • Structured Streaming
  • JSON files as source
  • Column Type Inference
  • Automated Schema Evolution
  • Delta as the target

 

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.