Adding a new column triggers reprocessing of Auto Loader source table
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-25-2024 09:19 AM
I have a source table A in Unity Catalog. This table is constantly written to and is a streaming table.
I also have another table B in Unity Catalog. This is a managed table with liquid clustering.
Using Auto Loader I move new data from A to B using a code similar to the following:
streaming_query = (
spark.readStream.option("ignoreDeletes", "true")
.table("catalog.bronze.A")
.selectExpr(
"column_1",
"column_2"
)
.writeStream.option("checkpointLocation", "/Volumes/catalog/silver/checkpoints/B")
.option("mergeSchema", "true")
.trigger(availableNow=True)
.toTable("B")
)
streaming_query.awaitTermination(timeout=300)
Everything was running smoothly, but I then decided to add a new column to the SELECT, so I end up with a SELECT similar to:
.selectExpr(
"column_1",
"column_2",
"column_3"
)
Now, after this configuration change, all the existing data in B got duplicated. The new column was added correctly to the duplicated records. It is as if the whole A table was reprocessed and appended to the already existing data in B.
Is this the expected behaviour based on my configuration? What can I do to avoid this in the future and just have the old data have NULL in the added columns?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-26-2024 08:27 AM
Ok, so how does schema evolution relate to .option("mergeSchema", "true") then? Are they different things? Do they step on each other toes?
If I make non-breaking changes to the schema (just adding), am I to understand that I can simply remove the .option("mergeSchema", "true")?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-26-2024 08:40 AM
- To prevent data duplication, you can perform data deduplication based on a unique identifier (e.g., a primary key). -> I don't want to do data deduplication every time I add a column.
- If your data has a natural key or timestamp, consider using it to identify unique records during the merge process. -> How?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-26-2024 09:04 AM
Ok , I might be totally wrong here but it seems you are not using autoloader for moving data from A to B. Autoloader is an easy way to process ingested files. But here you run a spark streaming query on table A.
When you change the selectExpr, the streaming query is restarted and the whole table is sent to B.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-27-2024 12:20 AM
Inded, @-werners- , you might be right, since I don't really use the cloudfiles functionality, which is really what Auto Loader is about.
In any case, even if it's just regular structured streaming, given that I have a configured checkpoint where the processed data log is snapshotted to, I don't see how reprocessing the whole table is a reasonable thing to do.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-27-2024 12:31 AM
The purpose of checkpoints is to recover after a failure. In your case, the streaming query is changed. Structured streaming isn't stateless. This means that in general the checkpoints cannot be reused when the query is changed.
Depending on the case, you might be able to recover semantics after changes:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovery-semantics-....
In your case the schema changes:
Structured Streaming automatically checkpoints the state data to fault-tolerant storage (for example, HDFS, AWS S3, Azure Blob storage) and restores it after restart. However, this assumes that the schema of the state data remains same across restarts.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-02-2024 12:30 AM
What if I alter my target table B in advance, so that it contains the new columns before the query starts writing to them?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-02-2024 12:45 AM
I don't think that would work as you would still have to change the query to select the new columns (unless you apply a select *).
Here is an overview of what can be changed and what not:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovery-semantics-...
Your case falls under Changes in projection / filter / map-like operations
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-02-2024 12:54 AM
change data feed might be a solution for you perhaps.
https://docs.databricks.com/en/delta/delta-change-data-feed.html