2 weeks ago
I want to raise a feature request as follows.
Currently, in the Automatic schema evolution for merge when a column is not present in the source dataset it is not changed in the target dataset. For e.g.
%sql
CREATE OR REPLACE TABLE edw_nprd_aen.bronze.test_table (
id INT
, assignments ARRAY<STRING>
);
INSERT INTO edw_nprd_aen.bronze.test_table
VALUES
( 1, ARRAY('S1-1', 'S1-2') )
, (2, NULL)
, (3, ARRAY('S3-1'))
, (4, ARRAY('S4-1'));
SELECT *
FROM edw_nprd_aen.bronze.test_table;%sql
CREATE OR REPLACE TEMPORARY VIEW test_view (id) AS VALUES (1), (3);
SELECT *
FROM test_view%sql
MERGE WITH SCHEMA EVOLUTION INTO edw_nprd_aen.bronze.test_table
USING test_view
ON test_table.id = test_view.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
SELECT *
FROM edw_nprd_aen.bronze.test_table
ORDER BY id---
What I want is that if the row matches and the column is not present then set it to null. Like the following
Currently, I'm achieving this as follows
table_cols = spark.table('edw_nprd_aen.bronze.test_table').columns
view_cols = spark.table('test_view').columns
merge_string = f"""
MERGE WITH SCHEMA EVOLUTION INTO edw_nprd_aen.bronze.test_table
USING test_view
ON test_table.id = test_view.id
WHEN MATCHED THEN UPDATE SET
{'\n, '.join([f"{col} = test_view.{col}" if col in view_cols else f"{col} = NULL" for col in table_cols])}
WHEN NOT MATCHED THEN INSERT *
"""
print(merge_string)
spark.sql(merge_string)
It will be great if there could be some in-built option for the same, like
UPDATE SET * NULL IF NOT PRESENT
2 weeks ago - last edited 2 weeks ago
Problem
When using MERGE INTO ... WITH SCHEMA EVOLUTION, if a column exists in the target table but is not present in the source dataset, that column is left unchanged on matched rows.
Solution Thinking
This can be emulated by introspecting the table and view schemas and generating the UPDATE SET list explicitly, assigning NULL for any target-only columns
Caution:
This should work, but it adds complexity, dynamic SQL, and more room for errors in larger pipelines.
2 weeks ago
Hi @ManojkMohan
Can you identify the possible errors in larger pipelines? How could I achieve the required output without the above code? Your reply is not very understandable.
The reason behind my feature request is as follows: The source is a NoSQL database and the schema coming from it is uncertain. Any new data I get should be written into the table with the previous data overwritten if key already present. Since schema is uncertain and the behaviour required on merge keys is overwrite, I have written this pipeline.
a week ago
The introspection-based approach like spark.table().columns introduces fragility
Alternatives
Full Replace Partition (Bronze Layer)
Column Mapping + Rename/Drop (Metadata-Only):
Enable on table: ALTER TABLE test_table SET TBLPROPERTIES (delta.columnMapping.mode = 'name');
Then MERGE ... UPDATE SET * ignores physical mismatches
https://docs.databricks.com/aws/en/delta/column-mapping
An option like MERGE ... UPDATE SET * NULL_MISSING would standardize NoSQL-to-Delta syncs cutting custom logic by 80% vs. delete+insert overhead (~2x write amp). Docs confirm UPDATE SET * skips missing source cols intentionally โnulling them aligns with INSERT * semantics for true overwrites
a week ago - last edited a week ago
Hi @ManojkMohan
Thanks for pointing out the issues. I now get the pain points of writing schema dynamically. The issue with schema read while concurrent writes are happening. And in streaming / DLT tables the schema reads for each write would add to a huge cost.
But, the solution you provided also has its own drawbacks:
Also, what is the benefit of column mapping? It focuses on renaming and deleting columns, neither of which I'm doing here.
Keeping in mind your points, I have these solutions in mind
MERGE WITH SCHEMA EVOLUTION INTO test_table target
USING v_final source
ON target.id = source.id
WHEN MATCHED THEN DELETE;
MERGE WITH SCHEMA EVOLUTION INTO test_table target
USING v_final source
ON target.id = source.id
WHEN NOT MATCHED THEN INSERT *;โMERGE WITH SCHEMA EVOLUTION INTO test_table target
USING (
SELECT *, 'delete' AS mode FROM test_view
UNION
SELECT *, 'insert' AS mode FROM test_view
) AS source
ON target.id = source.id AND source.mode = 'delete'
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT * EXCEPT (mode);โIf you have any other approaches in mind, would like to hear that.
As of now, I have a batch setup and can ensure there are no concurrent writes happening on the table. So, breaking down the merge into two parts will just double the time.