cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

"Detected schema change" error while reading from delta table in streaming after applying "ALTER COLUMN DROP NOT NULL" to more than one columns.

Anatoly
New Contributor III

Hi!

I have a delta table and a process that reading a stream from this table.

I need to drop the NOT NULL constraint from some of the columns of this table.

The first drop command does not affect the reading stream.

But the second command results in error:

StreamingQueryException: Detected schema change:
old schema: root
-- a: string (nullable = true)
-- b: string (nullable = true)
-- c: timestamp (nullable = false)
 
 
new schema: root
-- a: string (nullable = true)
-- b: string (nullable = false)
-- c: timestamp (nullable = false)

Full python notebook:

# Databricks notebook source
# MAGIC %sql
# MAGIC CREATE SCHEMA temp_schema
 
# COMMAND ----------
 
# MAGIC %sql
# MAGIC CREATE TABLE temp_schema.temp_stream_table (
# MAGIC   a STRING NOT NULL,
# MAGIC   b STRING NOT NULL,
# MAGIC   c TIMESTAMP NOT NULL
# MAGIC   )
# MAGIC LOCATION "dbfs:/tmp/temp_stream_table"
 
# COMMAND ----------
 
# MAGIC %sql
# MAGIC INSERT INTO temp_schema.temp_stream_table
# MAGIC VALUES ("a", "b", CURRENT_DATE), ("c", "d", CURRENT_DATE)
 
# COMMAND ----------
 
def run_stream():
    return (
      spark
     .readStream
     .option("mergeSchema", "true")
     .table("temp_schema.temp_stream_table")
     .writeStream
     .trigger(availableNow=True)
     .option("checkpointLocation", "/dbfs/tmp/temp_stream_checkpoint")
     .foreachBatch(lambda df, _id: None)
     .start()
     .awaitTermination()
    )
 
# COMMAND ----------
 
run_stream()
 
# COMMAND ----------
 
# MAGIC %sql
# MAGIC ALTER TABLE temp_schema.temp_stream_table
# MAGIC ALTER COLUMN a DROP NOT NULL;
 
# COMMAND ----------
 
run_stream()
# Success!
 
# COMMAND ----------
 
# MAGIC %sql
# MAGIC ALTER TABLE temp_schema.temp_stream_table
# MAGIC ALTER COLUMN b DROP NOT NULL;
 
# COMMAND ----------
 
run_stream()
### Failure here: StreamingQueryException: Detected schema change ...
 
# COMMAND ----------
 
# MAGIC %sql
# MAGIC DROP TABLE temp_schema.temp_stream_table;
# MAGIC DROP SCHEMA temp_schema
 
# COMMAND ----------
 
dbutils.fs.rm("dbfs:/tmp/temp_stream_checkpoint", True)
dbutils.fs.rm("dbfs:/tmp/temp_stream_table", True)

Is it a bug?

9 REPLIES 9

Kaniz
Community Manager
Community Manager

Hi @Anatoly Tikhonov​, Most probably /dbfs/tmp/temp_stream_checkpoint, the directory has some data from the previous run, and this data might have a different schema than the current one, so while loading new data to the same directory you will get such type of exception.

Anatoly
New Contributor III

Hi @Kaniz Fatma​ ,

I cleanup the checkpoint directory in rows 71-12, it should be empty.

Also no new data is inserted into table, the error is on read.

Kaniz
Community Manager
Community Manager

Hi @Anatoly Tikhonov​, Do you see the difference in the schemas of both tables in the screenshot you've shared?image 

Anatoly
New Contributor III

Yes, for field "b". nullable=true vs nullable=false

Kaniz
Community Manager
Community Manager

Hi @Anatoly Tikhonov​, Can you please specify your DBR version?

In case you are on DBR version 11.2, If you now try to drop a Delta table constraint by name, and that constraint does not exist, you will get an error. To get the previous behavior, which does not throw an error if the constraint does not exist, you must now use the IF EXISTS statement. See ALTER TABLE.

Anatoly
New Contributor III

DBR 10.4 LTS.

Also tested on 11.3 LTS - same result.

Anatoly
New Contributor III

@Kaniz Fatma​  The command used to drop constraint is:

ALTER TABLE temp_schema.temp_stream_table
ALTER COLUMN a DROP NOT NULL;

This means, I want to make the column nullable, not to drop it.

The error appears on readStream step, not on drop constraint.

Kaniz
Community Manager
Community Manager

Hi @Anatoly Tikhonov​ , I understood your point here.

The read stream will throw an exception if there are updates or deletes in your delta source. This is also clear from Databricks documentation:

Structured Streaming does not handle input that is not an append and throws an exception if any modifications occur on the table being used as a source.

If you use IgnoreChanges, True, it will not throw an exception but will give you the updated rows + rows that could have already been processed.

This is because everything in the delta table happens on the file level.

For example, if you update a single row in a file (roughly), the following will happen:

  1. Find and read the file which contains the record to be updated
  2. Write a new file containing the updated record + all other data in the old file.
  3. Mark the old file as removed and the new file as added in the transaction log.
  4. Your read stream will read the whole new file as new’ records. This means you can get duplicates in your steam.

This is also mentioned in the docs.

ignoreChanges: re-process updates if files had to be rewritten in the source table due to a data changing operation such as UPDATE, MERGE INTO, DELETE (within partitions), or OVERWRITE.

Entire rows may still be emitted; therefore, your downstream consumers should be able to handle duplicates. ...

You'll have to decide if this is ok for your use case. If you need to handle updates and deletes specifically, Databricks offers Change Data Feed, which you can enable on delta tables. This gives you row-level details about inserts, appends and deletes (at the cost of some extra storage and IO).

Anonymous
Not applicable

Hi @Anatoly Tikhonov​ 

Hope everything is going great.

Does @Kaniz Fatma​  response answer your question? If yes, would you be happy to mark it as best so that other members can find the solution more quickly?

We'd love to hear from you.

Thanks!

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.