How to dedupe a source table prior to merge through JDBC SQL driver integration
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-25-2022 02:29 AM
Hi!
We're working with change event data from relational and NoSQL databases then processing and ingesting that into DataBricks. It's streamed from source to our messaging platform. Then, our connector is pushing to DataBricks.
Right now we're doing that using DataBricks JDBC driver integrations and SQL - which is what we've done with most other database destinations. We're using COPY INTO with Parquet files loaded to DBFS. If the table doesn't exist already, it's created. If it does exist and schema's have drifted, column(s) are added as required. Partition and clustering fields added too.
We're working on an improvement: supporting MERGE INTO. We want to reflect the source database record 'latest state' in DataBricks.
We've got a few challenges.
One is preprocessing to avoid multiple matches (deduping) mentioned in DataBricks docs:
-
A MERGE operation can fail if multiple rows of the source dataset match and attempt to update the same rows of the target Delta table. According to the SQL semantics of merge, such an update operation is ambiguous as it is unclear which source row should be used to update the matched target row. You can preprocess the source table to eliminate the possibility of multiple matches. See the Change data capture example—it preprocesses the change dataset (that is, the source dataset) to retain only the latest change for each key before applying that change into the target Delta table
-
We looked at the Change data capture example linked, but the examples are based on Notebooks, no clear solution when using JDBC driver integration.
What's the recommended way of handling this?
A theory we had is a VIEW that retains the latest change for each key and perhaps merging from that into the final table, but that seems inefficient. We've tried writing equivalent DELETE SQL statements (for deduping) that we use for other database destinations, but they rely on either joins or sub-queries like this - which DataBricks doesn't seem to support. No joins in deletes and queries like this fail with column mismatch errors:
DELETE FROM {table} WHERE struct({id}, {timestamp}) NOT IN (SELECT struct({id}, {timestamp}) FROM {table} GROUP BY {id})
It feels like our external systems may have to rely on and interact with Notebooks to achieve this?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-25-2022 07:29 AM
Update on the theory we are looking at.
It'd be similar to below (with necessary changes to support best practices for MERGE such as reducing the search space):
-- View for deduping pre-merge
CREATE OR REPLACE TEMPORARY VIEW {view} AS SELECT * EXCEPT (dedupe_key) FROM (SELECT DISTINCT *, ROW_NUMBER() OVER (PARTITION BY {id} ORDER BY {timestamp}) AS dedupe_key FROM {bronze_table}) WHERE dedupe_key = 1;
-- Simple full table merge w/o tombstone handling/deletions
MERGE INTO {silver_table} USING {view} ON {silver_table}.{id} = {view}.{id} WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *;
The above doesn't solve the problem of schema drift though. We do have some situations where there's incompatible schema changes that require a bit of wrangling to resolve. We're not looking to ever delete columns or rewrite columns when there's incompatible type changes though. Type changes (compatible or otherwise) will be added as a new column. We do this currently with append only.
So that leads to a related question of how we do that in this merge use-case.
Do we alter the silver table as we do with the bronze table? Generating ALTER TABLE ... ADD COLUMN statements as required? Or, do we use a REPLACE TABLE {silver_table} USING DELTA AS SELECT * FROM {view} LIMIT 0; before MERGE?
Again, we're looking at this and are concerned with the inefficiencies. We're aware of APPLY CHANGES INTO and other DLT features, but again they're all heavily dependent on Notebooks.

