<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: How to dedupe a source table prior to merge through JDBC SQL driver integration in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/how-to-dedupe-a-source-table-prior-to-merge-through-jdbc-sql/m-p/25915#M18087</link>
    <description>&lt;P&gt;Update on the &lt;I&gt;theory &lt;/I&gt;we are looking at. &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;It'd be similar to below (with necessary changes to support best practices for MERGE such as reducing the search space):&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;-- View for deduping pre-merge&lt;/P&gt;&lt;P&gt;&amp;nbsp;CREATE OR REPLACE TEMPORARY VIEW {view} AS&amp;nbsp;SELECT * EXCEPT (dedupe_key) FROM (SELECT DISTINCT *, ROW_NUMBER() OVER (PARTITION BY {id} ORDER BY {timestamp}) AS dedupe_key&amp;nbsp;FROM {bronze_table}) WHERE dedupe_key = 1;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;-- Simple full table merge w/o tombstone handling/deletions&lt;/P&gt;&lt;P&gt;&amp;nbsp;MERGE INTO {silver_table} USING {view}&amp;nbsp;ON {silver_table}.{id} = {view}.{id}&amp;nbsp;WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;The above doesn't solve the problem of schema drift though. We do have some situations where there's incompatible schema changes that require a bit of wrangling to resolve. We're not looking to ever delete columns or rewrite columns when there's incompatible type changes though. Type changes (compatible or otherwise) will be added as a new column. We do this currently with append only.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;So that leads to a related question of how we do that in this merge use-case. &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Do we alter the silver table as we do with the bronze table? Generating ALTER TABLE ... ADD COLUMN statements as required? Or, do we use a REPLACE TABLE {silver_table} USING DELTA AS SELECT * FROM {view} LIMIT 0; before MERGE?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Again, we're looking at this and are concerned with the inefficiencies. We're aware of APPLY CHANGES INTO and other DLT features, but again they're all heavily dependent on Notebooks.&lt;/P&gt;</description>
    <pubDate>Tue, 25 Oct 2022 14:29:22 GMT</pubDate>
    <dc:creator>jon1</dc:creator>
    <dc:date>2022-10-25T14:29:22Z</dc:date>
    <item>
      <title>How to dedupe a source table prior to merge through JDBC SQL driver integration</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-dedupe-a-source-table-prior-to-merge-through-jdbc-sql/m-p/25914#M18086</link>
      <description>&lt;P&gt;Hi!&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;We're working with change event data from relational and NoSQL databases then processing and ingesting that into DataBricks. It's streamed from source to our messaging platform. Then, our connector is pushing to DataBricks.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Right now we're doing that using DataBricks JDBC driver integrations and SQL - which is what we've done with most other database destinations. We're using COPY INTO with Parquet files loaded to DBFS. If the table doesn't exist already, it's created. If it does exist and schema's have drifted, column(s) are added as required. Partition and clustering fields added too.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;We're working on an improvement: supporting&amp;nbsp;&lt;A href="https://docs.databricks.com/spark/latest/spark-sql/language-manual/delta-merge-into.html?_ga=2.238412992.138626079.1666687993-1321171150.1666687993" alt="https://docs.databricks.com/spark/latest/spark-sql/language-manual/delta-merge-into.html?_ga=2.238412992.138626079.1666687993-1321171150.1666687993" target="_blank"&gt;MERGE INTO&lt;/A&gt;. We want to reflect the source database record 'latest state' in DataBricks.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;We've got a few challenges. &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;One is preprocessing to avoid multiple matches (deduping) mentioned in DataBricks docs:&lt;/P&gt;&lt;P&gt;-&lt;/P&gt;&lt;P&gt;A&amp;nbsp;MERGE&amp;nbsp;operation can fail if multiple rows of the source dataset match and attempt to update the same rows of the target Delta table. According to the SQL semantics of merge, such an update operation is ambiguous as it is unclear which source row should be used to update the matched target row. You can preprocess the source table to eliminate the possibility of multiple matches. See the &lt;A href="https://docs.databricks.com/delta/merge.html#merge-in-cdc" alt="https://docs.databricks.com/delta/merge.html#merge-in-cdc" target="_blank"&gt;Change data capture example&lt;/A&gt;—it preprocesses the change dataset (that is, the source dataset) to retain only the latest change for each key before applying that change into the target Delta table&lt;/P&gt;&lt;P&gt;-&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;We looked at the Change data capture example linked, but the examples are based on Notebooks, no clear solution when using JDBC driver integration.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;What's the recommended way of handling this?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;A theory we had is a&amp;nbsp;VIEW&amp;nbsp;that retains the latest change for each key and perhaps merging from that into the final table, but that seems inefficient. We've tried writing equivalent&amp;nbsp;DELETE&amp;nbsp;SQL statements (for deduping) that we use for other database destinations, but they rely on either joins or sub-queries like this - which DataBricks doesn't seem to support. No joins in deletes and queries like this fail with column mismatch errors:&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;DELETE FROM {table} WHERE struct({id}, {timestamp}) NOT IN (SELECT struct({id}, {timestamp}) FROM {table} GROUP BY {id})&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;It feels like our external systems may have to rely on and interact with Notebooks to achieve this?&lt;/P&gt;</description>
      <pubDate>Tue, 25 Oct 2022 09:29:42 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-dedupe-a-source-table-prior-to-merge-through-jdbc-sql/m-p/25914#M18086</guid>
      <dc:creator>jon1</dc:creator>
      <dc:date>2022-10-25T09:29:42Z</dc:date>
    </item>
    <item>
      <title>Re: How to dedupe a source table prior to merge through JDBC SQL driver integration</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-dedupe-a-source-table-prior-to-merge-through-jdbc-sql/m-p/25915#M18087</link>
      <description>&lt;P&gt;Update on the &lt;I&gt;theory &lt;/I&gt;we are looking at. &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;It'd be similar to below (with necessary changes to support best practices for MERGE such as reducing the search space):&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;-- View for deduping pre-merge&lt;/P&gt;&lt;P&gt;&amp;nbsp;CREATE OR REPLACE TEMPORARY VIEW {view} AS&amp;nbsp;SELECT * EXCEPT (dedupe_key) FROM (SELECT DISTINCT *, ROW_NUMBER() OVER (PARTITION BY {id} ORDER BY {timestamp}) AS dedupe_key&amp;nbsp;FROM {bronze_table}) WHERE dedupe_key = 1;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;-- Simple full table merge w/o tombstone handling/deletions&lt;/P&gt;&lt;P&gt;&amp;nbsp;MERGE INTO {silver_table} USING {view}&amp;nbsp;ON {silver_table}.{id} = {view}.{id}&amp;nbsp;WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;The above doesn't solve the problem of schema drift though. We do have some situations where there's incompatible schema changes that require a bit of wrangling to resolve. We're not looking to ever delete columns or rewrite columns when there's incompatible type changes though. Type changes (compatible or otherwise) will be added as a new column. We do this currently with append only.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;So that leads to a related question of how we do that in this merge use-case. &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Do we alter the silver table as we do with the bronze table? Generating ALTER TABLE ... ADD COLUMN statements as required? Or, do we use a REPLACE TABLE {silver_table} USING DELTA AS SELECT * FROM {view} LIMIT 0; before MERGE?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Again, we're looking at this and are concerned with the inefficiencies. We're aware of APPLY CHANGES INTO and other DLT features, but again they're all heavily dependent on Notebooks.&lt;/P&gt;</description>
      <pubDate>Tue, 25 Oct 2022 14:29:22 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-dedupe-a-source-table-prior-to-merge-through-jdbc-sql/m-p/25915#M18087</guid>
      <dc:creator>jon1</dc:creator>
      <dc:date>2022-10-25T14:29:22Z</dc:date>
    </item>
  </channel>
</rss>

