<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Delta live table: Retrieve CDF columns in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/delta-live-table-retrieve-cdf-columns/m-p/88132#M37487</link>
    <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/103899"&gt;@LauJohansson&lt;/a&gt;&amp;nbsp;,&amp;nbsp;&lt;SPAN&gt;Here's an example source code for a DLT pipeline that reads from a bronze table in&amp;nbsp;&lt;A href="https://docs.databricks.com/en/delta/delta-change-data-feed.html#enable" target="_blank"&gt;CDF&lt;/A&gt; and uses the&amp;nbsp;&lt;A href="https://docs.databricks.com/en/delta-live-tables/cdc.html" target="_blank"&gt;apply_changes&lt;/A&gt;&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;function to upsert to your silver table, sequencing the upsert rows in order by the&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class="du-bois-dark-typography css-aqikf3"&gt;&lt;CODE&gt;_commit_timestamp&lt;/CODE&gt;&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;column for the latest row-level changes:&lt;/SPAN&gt;&lt;/P&gt;
&lt;DIV&gt;
&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;LI-CODE lang="python"&gt;import dlt
from pyspark.sql.functions import col, expr

source = f"catalog.schema.cdf_enabled_table"

bronze = "cdf_enabled_table_sequency_by_bronze"
silver = "cdf_enabled_table_sequency_by_silver"

@dlt.view(
  name=bronze,
)
def cdf_enabled_table_sequency_by_bronze():
  return spark.readStream.option("readChangeFeed", "true").table(source)

dlt.create_streaming_table(name=silver)

dlt.apply_changes(
  target = silver,
  source = bronze,
  keys = ["id"],
  sequence_by = col("_commit_timestamp"),
  stored_as_scd_type = 1
)​&lt;/LI-CODE&gt;&lt;/DIV&gt;
&lt;P&gt;&lt;SPAN&gt;It seems that you're using this setup for data deduplication, it should work but please consider the&amp;nbsp;&lt;A href="https://docs.databricks.com/en/delta-live-tables/cdc.html#limitations" target="_self"&gt;APPLY_CHANGES limitations&lt;/A&gt;&amp;nbsp;and &lt;A href="https://docs.databricks.com/en/delta-live-tables/python-ref.html#limitations" target="_blank"&gt;CDF limitations&lt;/A&gt; while designing your pipeline.&lt;/SPAN&gt;&lt;/P&gt;</description>
    <pubDate>Tue, 03 Sep 2024 18:54:13 GMT</pubDate>
    <dc:creator>raphaelblg</dc:creator>
    <dc:date>2024-09-03T18:54:13Z</dc:date>
    <item>
      <title>Delta live table: Retrieve CDF columns</title>
      <link>https://community.databricks.com/t5/data-engineering/delta-live-table-retrieve-cdf-columns/m-p/87208#M37416</link>
      <description>&lt;P&gt;I have want to use the apply_changes feature from a bronze table to a silver table.The bronze table&lt;SPAN&gt;&amp;nbsp;have no "natural" sequence_by column. Therefore, I want to use the CDF column "_commit_timestamp" as the sequence_by.&lt;BR /&gt;&lt;BR /&gt;How do I retrieve the columns in a dlt setup?&amp;nbsp;&lt;BR /&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 02 Sep 2024 12:42:56 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/delta-live-table-retrieve-cdf-columns/m-p/87208#M37416</guid>
      <dc:creator>LauJohansson</dc:creator>
      <dc:date>2024-09-02T12:42:56Z</dc:date>
    </item>
    <item>
      <title>Re: Delta live table: Retrieve CDF columns</title>
      <link>https://community.databricks.com/t5/data-engineering/delta-live-table-retrieve-cdf-columns/m-p/88132#M37487</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/103899"&gt;@LauJohansson&lt;/a&gt;&amp;nbsp;,&amp;nbsp;&lt;SPAN&gt;Here's an example source code for a DLT pipeline that reads from a bronze table in&amp;nbsp;&lt;A href="https://docs.databricks.com/en/delta/delta-change-data-feed.html#enable" target="_blank"&gt;CDF&lt;/A&gt; and uses the&amp;nbsp;&lt;A href="https://docs.databricks.com/en/delta-live-tables/cdc.html" target="_blank"&gt;apply_changes&lt;/A&gt;&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;function to upsert to your silver table, sequencing the upsert rows in order by the&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN class="du-bois-dark-typography css-aqikf3"&gt;&lt;CODE&gt;_commit_timestamp&lt;/CODE&gt;&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;column for the latest row-level changes:&lt;/SPAN&gt;&lt;/P&gt;
&lt;DIV&gt;
&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;LI-CODE lang="python"&gt;import dlt
from pyspark.sql.functions import col, expr

source = f"catalog.schema.cdf_enabled_table"

bronze = "cdf_enabled_table_sequency_by_bronze"
silver = "cdf_enabled_table_sequency_by_silver"

@dlt.view(
  name=bronze,
)
def cdf_enabled_table_sequency_by_bronze():
  return spark.readStream.option("readChangeFeed", "true").table(source)

dlt.create_streaming_table(name=silver)

dlt.apply_changes(
  target = silver,
  source = bronze,
  keys = ["id"],
  sequence_by = col("_commit_timestamp"),
  stored_as_scd_type = 1
)​&lt;/LI-CODE&gt;&lt;/DIV&gt;
&lt;P&gt;&lt;SPAN&gt;It seems that you're using this setup for data deduplication, it should work but please consider the&amp;nbsp;&lt;A href="https://docs.databricks.com/en/delta-live-tables/cdc.html#limitations" target="_self"&gt;APPLY_CHANGES limitations&lt;/A&gt;&amp;nbsp;and &lt;A href="https://docs.databricks.com/en/delta-live-tables/python-ref.html#limitations" target="_blank"&gt;CDF limitations&lt;/A&gt; while designing your pipeline.&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 03 Sep 2024 18:54:13 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/delta-live-table-retrieve-cdf-columns/m-p/88132#M37487</guid>
      <dc:creator>raphaelblg</dc:creator>
      <dc:date>2024-09-03T18:54:13Z</dc:date>
    </item>
    <item>
      <title>Re: Delta live table: Retrieve CDF columns</title>
      <link>https://community.databricks.com/t5/data-engineering/delta-live-table-retrieve-cdf-columns/m-p/88138#M37490</link>
      <description>&lt;P&gt;Delta Live Tables Python language reference:&amp;nbsp;&lt;A href="https://docs.databricks.com/en/delta-live-tables/python-ref.html" target="_blank"&gt;https://docs.databricks.com/en/delta-live-tables/python-ref.html&lt;/A&gt;&lt;/P&gt;
&lt;P&gt;Delta Live Tables SQL language reference:&amp;nbsp;&lt;A href="https://docs.databricks.com/en/delta-live-tables/sql-ref.html" target="_blank"&gt;https://docs.databricks.com/en/delta-live-tables/sql-ref.html&lt;/A&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 03 Sep 2024 19:47:37 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/delta-live-table-retrieve-cdf-columns/m-p/88138#M37490</guid>
      <dc:creator>raphaelblg</dc:creator>
      <dc:date>2024-09-03T19:47:37Z</dc:date>
    </item>
    <item>
      <title>Re: Delta live table: Retrieve CDF columns</title>
      <link>https://community.databricks.com/t5/data-engineering/delta-live-table-retrieve-cdf-columns/m-p/88257#M37518</link>
      <description>&lt;P&gt;Thank you&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/97998"&gt;@raphaelblg&lt;/a&gt;!&lt;BR /&gt;&lt;BR /&gt;I chose to write an article on the subject after this discussion:&amp;nbsp;&lt;A href="https://www.linkedin.com/pulse/databricks-delta-live-tables-merging-lau-johansson-cdtce/?trackingId=L872gj0yQouXgJudM75gdw%3D%3D" target="_blank"&gt;https://www.linkedin.com/pulse/databricks-delta-live-tables-merging-lau-johansson-cdtce/?trackingId=L872gj0yQouXgJudM75gdw%3D%3D&lt;/A&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 04 Sep 2024 09:48:39 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/delta-live-table-retrieve-cdf-columns/m-p/88257#M37518</guid>
      <dc:creator>LauJohansson</dc:creator>
      <dc:date>2024-09-04T09:48:39Z</dc:date>
    </item>
  </channel>
</rss>

