cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Adding a new column triggers reprocessing of Auto Loader source table

cosminsanda
New Contributor III

I have a source table A in Unity Catalog. This table is constantly written to and is a streaming table.
I also have another table B in Unity Catalog. This is a managed table with liquid clustering.

Using Auto Loader I move new data from A to B using a code similar to the following: 

streaming_query = (
spark.readStream.option("ignoreDeletes", "true")
.table("catalog.bronze.A")
.selectExpr(
"column_1",
"column_2"
)
.writeStream.option("checkpointLocation", "/Volumes/catalog/silver/checkpoints/B")
.option("mergeSchema", "true")
.trigger(availableNow=True)
.toTable("B")
)

streaming_query.awaitTermination(timeout=300)

Everything was running smoothly, but I then decided to add a new column to the SELECT, so I end up with a SELECT similar to:

.selectExpr(
"column_1",
"column_2",
"column_3"
)

 Now, after this configuration change, all the existing data in B got duplicated. The new column was added correctly to the duplicated records. It is as if the whole A table was reprocessed and appended to the already existing data in B.

Is this the expected behaviour based on my configuration? What can I do to avoid this in the future and just have the old data have NULL in the added columns?

9 REPLIES 9

Kaniz
Community Manager
Community Manager

Hi @cosminsanda

  • Ensure that the checkpoint location for your streaming query is correctly set. If the checkpoint location is not properly managed, it can lead to unexpected behaviour.
  • When you modify the SELECT query by adding a new column, it triggers schema evolution.
  • Schema evolution allows for changes in the schema (such as adding new columns) without breaking existing data.
  • However, it’s essential to handle schema evolution correctly to avoid data duplication.
  • You’ve set the .option("mergeSchema", "true") in your writeStream configuration.
  • This option allows the new schema to be merged with the existing schema.
  • Make sure that this option is indeed necessary for your use case. If not, consider removing it.
  • To prevent data duplication, you can perform data deduplication based on a unique identifier (e.g., a primary key).
  • If your data has a natural key or timestamp, consider using it to identify unique records during the merge process.
  • When adding a new column, existing records should have NULL values for that column.
  • You might need to manually update the existing data in table B to set the new column to NULL for old records.

cosminsanda
New Contributor III

Ok, so how does schema evolution relate to .option("mergeSchema", "true") then? Are they different things? Do they step on each other toes?

If I make non-breaking changes to the schema (just adding), am I to understand that I can simply remove the .option("mergeSchema", "true")?

 

cosminsanda
New Contributor III
  • To prevent data duplication, you can perform data deduplication based on a unique identifier (e.g., a primary key). -> I don't want to do data deduplication every time I add a column.
  • If your data has a natural key or timestamp, consider using it to identify unique records during the merge process. -> How?

-werners-
Esteemed Contributor III

Ok , I might be totally wrong here but it seems you are not using autoloader for moving data from A to B. Autoloader is an easy way to process ingested files. But here you run a spark streaming query on table A.
When you change the selectExpr, the streaming query is restarted and the whole table is sent to B.

Inded, @-werners- , you might be right, since I don't really use the cloudfiles functionality, which is really what Auto Loader is about.

In any case, even if it's just regular structured streaming, given that I have a configured checkpoint where the processed data log is snapshotted to, I don't see how reprocessing the whole table is a reasonable thing to do.

-werners-
Esteemed Contributor III

The purpose of checkpoints is to recover after a failure.  In your case, the streaming query is changed.  Structured streaming isn't stateless.  This means that in general the checkpoints cannot be reused when the query is changed.
Depending on the case, you might be able to recover semantics after changes:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovery-semantics-....
In your case the schema changes:
Structured Streaming automatically checkpoints the state data to fault-tolerant storage (for example, HDFS, AWS S3, Azure Blob storage) and restores it after restart. However, this assumes that the schema of the state data remains same across restarts.

cosminsanda
New Contributor III

What if I alter my target table B in advance, so that it contains the new columns before the query starts writing to them?

-werners-
Esteemed Contributor III

I don't think that would work as you would still have to change the query to select the new columns (unless you apply a select *).
Here is an overview of what can be changed and what not:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovery-semantics-...

Your case falls under Changes in projection / filter / map-like operations

-werners-
Esteemed Contributor III

change data feed might be a solution for you perhaps.
https://docs.databricks.com/en/delta/delta-change-data-feed.html

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.