cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Databricks Autoloader Schema Evolution throws StateSchemaNotCompatible exception

robertkoss
New Contributor III

I am trying to use Databricks Autoloader for a very simple use case:

Reading JSONs from S3 and loading them into a delta table, with schema inference and evolution.

This is my code:

self.spark \
      .readStream \
      .format("cloudFiles") \
      .option("cloudFiles.format", "json") \
      .option("cloudFiles.inferColumnTypes", "true") \
      .option("cloudFiles.schemaLocation", f"{self.target_s3_bucket}/_schema/{source_table_name}") \
      .load(f"{self.source_s3_bucket}/{source_table_name}") \
      .distinct() \
      .writeStream \
      .trigger(availableNow=True) \
      .format("delta") \
      .option("mergeSchema", "true") \
      .option("checkpointLocation", f"{self.target_s3_bucket}/_checkpoint/{source_table_name}") \
      .option("streamName", source_table_name) \
      .start(f"{self.target_s3_bucket}/{target_table_name}")

When a JSON with an unknown column arrives, the Stream fails, as expected, with a NEW_FIELDS_IN_RECORD_WITH_FILE_PATH exception.

But when I retry the job, I get the following exception:

StateSchemaNotCompatible: Provided schema doesn't match to the schema for existing state! Please note that Spark allow difference of field name: check count of fields and data type of each field.

This is my first time using Autoloader, am I doing something obviously wrong?

I've posted this already to StackOverflow and got some answers that were not that helpful though: 

https://stackoverflow.com/questions/77482302/databricks-autoloader-schema-evolution-throws-statesche...

3 REPLIES 3

Hey @Retired_mod ,

Thank you for the answer. 

  • Clearing checkpoint data is, unfortunately, not an option. The Stream would reprocess all the data again, and this is not what I want since the Stream is running incrementally.
  • Manual schema declaration is also not an option since I want to add new columns.

What confuses me is that the StateSchemaNotCompatible exception is emitted from Spark Structured Streaming and is not an AutoLoader exception. 

When I add a new column to the base table, the Stream fails with the NEW_FIELDS_IN_RECORD_WITH_FILE_PATH exception, which is expected when specifying addNewColumns.

When I restart the Stream, it fails with StateSchemaNotCompatible, which shouldn't be the case since the schema should be updated as soon as AutoLoader fails with the NEW_FIELDS_IN_RECORD_WITH_FILE_PATH exception.

My use case seems to be straightforward. I can not imagine that I am the only one that tries to run AutoLoader with:

  • Structured Streaming
  • JSON files as source
  • Column Type Inference
  • Automated Schema Evolution
  • Delta as the target

 

Nes_Hdr
New Contributor III

@robertkoss I have the exact same problem... have you found a solution ? 

 

robertkoss
New Contributor III

Hey, the problem is 

distinct()

because it requires a state.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group