<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: How can I dedupe from a table created from a Kinesis change data capture feed. in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/how-can-i-dedupe-from-a-table-created-from-a-kinesis-change-data/m-p/105003#M41971</link>
    <description>&lt;P&gt;Hello&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/20512"&gt;@aliacovella&lt;/a&gt;,&lt;/P&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;Looks like there are duplicate records in your source table that match the same target record. This is indeed the case since your source table, &lt;CODE&gt;organizations_silver&lt;/CODE&gt;, contains duplicates due to the append-only nature of the Kinesis stream.&lt;/P&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;SPAN&gt;To properly deduplicate records from a Kinesis change data capture feed, you can use the &lt;CODE&gt;APPLY CHANGES&lt;/CODE&gt; API provided by Delta Live Tables (DLT). This API simplifies change data capture (CDC) by handling out-of-sequence records and ensuring correct processing of CDC records.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;SPAN&gt;&lt;A href="https://docs.databricks.com/en/delta-live-tables/cdc.html" target="_blank"&gt;https://docs.databricks.com/en/delta-live-tables/cdc.html&lt;/A&gt;&lt;/SPAN&gt;&lt;/P&gt;</description>
    <pubDate>Thu, 09 Jan 2025 17:25:33 GMT</pubDate>
    <dc:creator>Alberto_Umana</dc:creator>
    <dc:date>2025-01-09T17:25:33Z</dc:date>
    <item>
      <title>How can I dedupe from a table created from a Kinesis change data capture feed.</title>
      <link>https://community.databricks.com/t5/data-engineering/how-can-i-dedupe-from-a-table-created-from-a-kinesis-change-data/m-p/104972#M41959</link>
      <description>&lt;P&gt;Here I have a table named&amp;nbsp;organizations_silver that was build from a bronze table created from a Kinesis change data capture feed.&lt;/P&gt;&lt;DIV&gt;&lt;PRE&gt;&lt;SPAN&gt;@dlt.table&lt;/SPAN&gt;(&lt;SPAN&gt;name&lt;/SPAN&gt;=&lt;SPAN&gt;"kinesis_raw_stream"&lt;/SPAN&gt;, &lt;SPAN&gt;table_properties&lt;/SPAN&gt;={&lt;SPAN&gt;"pipelines.reset.allowed"&lt;/SPAN&gt;: &lt;SPAN&gt;"false"&lt;/SPAN&gt;})&lt;BR /&gt;&lt;SPAN&gt;def &lt;/SPAN&gt;&lt;SPAN&gt;kinesis_raw_stream&lt;/SPAN&gt;():&lt;BR /&gt;    &lt;SPAN&gt;return &lt;/SPAN&gt;read_kinesis_stream(&lt;SPAN&gt;stream_name&lt;/SPAN&gt;=STREAM, &lt;SPAN&gt;dbutils&lt;/SPAN&gt;=dbutils, &lt;SPAN&gt;spark&lt;/SPAN&gt;=spark)&lt;/PRE&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;The initial table is an append table from the Kinesis stream. From there I destructured the data into the&amp;nbsp;organizations_silver table.&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;PRE&gt;&lt;SPAN&gt;@dlt.table&lt;/SPAN&gt;(&lt;SPAN&gt;name&lt;/SPAN&gt;=&lt;SPAN&gt;"organizations_silver"&lt;/SPAN&gt;)&lt;BR /&gt;&lt;SPAN&gt;def &lt;/SPAN&gt;&lt;SPAN&gt;organizations_silver&lt;/SPAN&gt;():&lt;BR /&gt;    &lt;SPAN&gt;org_schema &lt;/SPAN&gt;= StructType([&lt;BR /&gt;        StructField(&lt;SPAN&gt;"id"&lt;/SPAN&gt;, IntegerType(), &lt;SPAN&gt;True&lt;/SPAN&gt;),&lt;BR /&gt;        StructField(&lt;SPAN&gt;"org_name"&lt;/SPAN&gt;, StringType(), &lt;SPAN&gt;True&lt;/SPAN&gt;),&lt;BR /&gt;        StructField(&lt;SPAN&gt;"region"&lt;/SPAN&gt;, StringType(), &lt;SPAN&gt;True&lt;/SPAN&gt;)&lt;BR /&gt;    ])&lt;BR /&gt;&lt;BR /&gt;    &lt;SPAN&gt;return &lt;/SPAN&gt;(&lt;BR /&gt;        dlt.readStream(&lt;SPAN&gt;"kinesis_raw_stream"&lt;/SPAN&gt;)&lt;BR /&gt;        .select(decode(col(&lt;SPAN&gt;"data"&lt;/SPAN&gt;), &lt;SPAN&gt;"UTF-8"&lt;/SPAN&gt;).alias(&lt;SPAN&gt;"json_string"&lt;/SPAN&gt;))&lt;BR /&gt;        .select(from_json(col(&lt;SPAN&gt;"json_string"&lt;/SPAN&gt;), raw_schema).alias(&lt;SPAN&gt;"json_data"&lt;/SPAN&gt;))&lt;BR /&gt;        .filter(col(&lt;SPAN&gt;"json_data.metadata.table-name"&lt;/SPAN&gt;) == &lt;SPAN&gt;"organizations"&lt;/SPAN&gt;)&lt;BR /&gt;        .select(from_json(col(&lt;SPAN&gt;"json_data.data"&lt;/SPAN&gt;), &lt;SPAN&gt;org_schema&lt;/SPAN&gt;).alias(&lt;SPAN&gt;"org_data"&lt;/SPAN&gt;))&lt;BR /&gt;        .select(&lt;SPAN&gt;"org_data.id"&lt;/SPAN&gt;, &lt;SPAN&gt;"org_data.org_name"&lt;/SPAN&gt;, &lt;SPAN&gt;"org_data.region"&lt;/SPAN&gt;)&lt;BR /&gt;    )&lt;/PRE&gt;&lt;/DIV&gt;&lt;P&gt;The problem is that since the original table is an append-only, records get duplicated in the silver table.&amp;nbsp;&lt;/P&gt;&lt;P&gt;I attempted to create another table from the silver table to dedupe it as follows:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;PRE&gt;df = spark.table(&lt;SPAN&gt;"development_demo_catalog.default.organizations_silver"&lt;/SPAN&gt;)&lt;BR /&gt;&lt;BR /&gt;df.write.format(&lt;SPAN&gt;"delta"&lt;/SPAN&gt;).mode(&lt;SPAN&gt;"overwrite"&lt;/SPAN&gt;).saveAsTable(&lt;SPAN&gt;"development_demo_catalog.default.organizations_&lt;/SPAN&gt;&lt;SPAN&gt;deduped&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;)&lt;BR /&gt;&lt;BR /&gt;DeltaTable.forName(spark, &lt;SPAN&gt;"development_demo_catalog.default.organizations_&lt;/SPAN&gt;&lt;SPAN&gt;deduped&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;) \&lt;BR /&gt;    .alias(&lt;SPAN&gt;"target"&lt;/SPAN&gt;) \&lt;BR /&gt;    .merge(&lt;BR /&gt;        df.alias(&lt;SPAN&gt;"source"&lt;/SPAN&gt;),&lt;BR /&gt;        &lt;SPAN&gt;"source.id = target.id"&lt;BR /&gt;&lt;/SPAN&gt;    ) \&lt;BR /&gt;    .whenMatchedUpdateAll() \&lt;BR /&gt;    .whenNotMatchedInsertAll() \&lt;BR /&gt;    .execute()&lt;/PRE&gt;&lt;/DIV&gt;&lt;P&gt;My understanding is that if there is a match it should update all the columns on the existing deduped table. However, when I run the notebook, I encounter the following error:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;&lt;SPAN&gt;Cannot perform Merge as multiple source rows matched and attempted to modify the same&lt;/SPAN&gt;&lt;/PRE&gt;&lt;P&gt;This seems to indicate that there are duplicates in the source, which is the case and is why I'm trying to dedupe it. My question is what am I missing here and how should I properly dedupe records from a Kinesis change data capture feed?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 09 Jan 2025 15:10:28 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-can-i-dedupe-from-a-table-created-from-a-kinesis-change-data/m-p/104972#M41959</guid>
      <dc:creator>aliacovella</dc:creator>
      <dc:date>2025-01-09T15:10:28Z</dc:date>
    </item>
    <item>
      <title>Re: How can I dedupe from a table created from a Kinesis change data capture feed.</title>
      <link>https://community.databricks.com/t5/data-engineering/how-can-i-dedupe-from-a-table-created-from-a-kinesis-change-data/m-p/105003#M41971</link>
      <description>&lt;P&gt;Hello&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/20512"&gt;@aliacovella&lt;/a&gt;,&lt;/P&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;Looks like there are duplicate records in your source table that match the same target record. This is indeed the case since your source table, &lt;CODE&gt;organizations_silver&lt;/CODE&gt;, contains duplicates due to the append-only nature of the Kinesis stream.&lt;/P&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;SPAN&gt;To properly deduplicate records from a Kinesis change data capture feed, you can use the &lt;CODE&gt;APPLY CHANGES&lt;/CODE&gt; API provided by Delta Live Tables (DLT). This API simplifies change data capture (CDC) by handling out-of-sequence records and ensuring correct processing of CDC records.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;SPAN&gt;&lt;A href="https://docs.databricks.com/en/delta-live-tables/cdc.html" target="_blank"&gt;https://docs.databricks.com/en/delta-live-tables/cdc.html&lt;/A&gt;&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 09 Jan 2025 17:25:33 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-can-i-dedupe-from-a-table-created-from-a-kinesis-change-data/m-p/105003#M41971</guid>
      <dc:creator>Alberto_Umana</dc:creator>
      <dc:date>2025-01-09T17:25:33Z</dc:date>
    </item>
  </channel>
</rss>

