<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Merge using DLT in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/merge-using-dlt/m-p/128423#M48235</link>
    <description>&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from pyspark.sql.functions import col, lit, expr, when, to_timestamp, current_timestamp&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from pyspark.sql.functions import max as max_&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import dlt&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from pyspark.sql.types import StructType, StructField, StringType&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from pyspark.sql.utils import AnalysisException&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from pyspark.sql.functions import col, struct, lit&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from typing import Optional, Tuple&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from pandas import DataFrame&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# Define the source table&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;source_table = &amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;"LIVE.source"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# Define the target table&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;target_table = &lt;/SPAN&gt;&lt;SPAN&gt;"target"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;dlt.create_streaming_table(target_table)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;schema = StructType([&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;StructField(&lt;/SPAN&gt;&lt;SPAN&gt;"name"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;StringType()&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;True)&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;StructField(&lt;/SPAN&gt;&lt;SPAN&gt;"rackName"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;StringType()&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;True)&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;StructField(&lt;/SPAN&gt;&lt;SPAN&gt;"clusterName"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;StringType()&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;True)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;])&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/97035"&gt;@Dlt&lt;/a&gt;.table(name=&lt;/SPAN&gt;&lt;SPAN&gt;"source_89939"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;def source_&lt;/SPAN&gt;&lt;SPAN&gt;89939&lt;/SPAN&gt;&lt;SPAN&gt;():&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; # Read from the source table&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; df = spark.readStream.format(&lt;/SPAN&gt;&lt;SPAN&gt;"delta"&lt;/SPAN&gt;&lt;SPAN&gt;).option(&lt;/SPAN&gt;&lt;SPAN&gt;"skipChangeCommits"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;).table(source_table)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; # Filter data from the last &lt;/SPAN&gt;&lt;SPAN&gt;24&lt;/SPAN&gt;&lt;SPAN&gt; hours&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; filtered_df = df.filter(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; (col(&lt;/SPAN&gt;&lt;SPAN&gt;"tenant"&lt;/SPAN&gt;&lt;SPAN&gt;) == lit(&lt;/SPAN&gt;&lt;SPAN&gt;"vdi"&lt;/SPAN&gt;&lt;SPAN&gt;)) &amp;amp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; (col(&lt;/SPAN&gt;&lt;SPAN&gt;"rack"&lt;/SPAN&gt;&lt;SPAN&gt;) != lit(&lt;/SPAN&gt;&lt;SPAN&gt;"unknown"&lt;/SPAN&gt;&lt;SPAN&gt;)) &amp;amp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; (to_timestamp(col(&lt;/SPAN&gt;&lt;SPAN&gt;"timestamp"&lt;/SPAN&gt;&lt;SPAN&gt;), &lt;/SPAN&gt;&lt;SPAN&gt;"yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS'Z'"&lt;/SPAN&gt;&lt;SPAN&gt;) &amp;gt;= expr(&lt;/SPAN&gt;&lt;SPAN&gt;"current_timestamp() - interval 24 hours"&lt;/SPAN&gt;&lt;SPAN&gt;))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; # Select the necessary columns&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; selected_df = filtered_df.select(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; col(&lt;/SPAN&gt;&lt;SPAN&gt;"name"&lt;/SPAN&gt;&lt;SPAN&gt;).alias(&lt;/SPAN&gt;&lt;SPAN&gt;"name"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; col(&lt;/SPAN&gt;&lt;SPAN&gt;"rack"&lt;/SPAN&gt;&lt;SPAN&gt;).alias(&lt;/SPAN&gt;&lt;SPAN&gt;"rackName"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; col(&lt;/SPAN&gt;&lt;SPAN&gt;"hosts"&lt;/SPAN&gt;&lt;SPAN&gt;).alias(&lt;/SPAN&gt;&lt;SPAN&gt;"hosts"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; to_timestamp(col(&lt;/SPAN&gt;&lt;SPAN&gt;"timestamp"&lt;/SPAN&gt;&lt;SPAN&gt;), &lt;/SPAN&gt;&lt;SPAN&gt;"yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS'Z'"&lt;/SPAN&gt;&lt;SPAN&gt;).alias(&lt;/SPAN&gt;&lt;SPAN&gt;"timestamp"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; return selected_df&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;dlt.apply_changes(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; target=target_table,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; source=&lt;/SPAN&gt;&lt;SPAN&gt;"source_89939"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; keys=[&lt;/SPAN&gt;&lt;SPAN&gt;"name"&lt;/SPAN&gt;&lt;SPAN&gt;],&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; sequence_by=col(&lt;/SPAN&gt;&lt;SPAN&gt;"timestamp"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; name=&lt;/SPAN&gt;&lt;SPAN&gt;"initial"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;def next_snapshot_and_version(latest_snapshot_version: Optional[&lt;/SPAN&gt;&lt;SPAN&gt;int&lt;/SPAN&gt;&lt;SPAN&gt;]) -&amp;gt; Tuple[&lt;/SPAN&gt;&lt;SPAN&gt;Optional&lt;/SPAN&gt;&lt;SPAN&gt;[&lt;/SPAN&gt;&lt;SPAN&gt;int&lt;/SPAN&gt;&lt;SPAN&gt;], &lt;/SPAN&gt;&lt;SPAN&gt;DataFrame&lt;/SPAN&gt;&lt;SPAN&gt;]:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; # Read from the source table&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; source_df = spark.read \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .format(&lt;/SPAN&gt;&lt;SPAN&gt;"delta"&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option(&lt;/SPAN&gt;&lt;SPAN&gt;"ignoreDeletes"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .table(source_table)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; # Read from the target table using dlt&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; target_df =dlt.read(&lt;/SPAN&gt;&lt;SPAN&gt;"700025_ctg_dev.test_ramu.target"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; # Perform the join operation&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; result_df = target_df.alias(&lt;/SPAN&gt;&lt;SPAN&gt;"t"&lt;/SPAN&gt;&lt;SPAN&gt;).join(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; source_df.alias(&lt;/SPAN&gt;&lt;SPAN&gt;"s"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; on=&lt;/SPAN&gt;&lt;SPAN&gt;"name"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; how=&lt;/SPAN&gt;&lt;SPAN&gt;"left_anti"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; ).select(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; col(&lt;/SPAN&gt;&lt;SPAN&gt;"t.name"&lt;/SPAN&gt;&lt;SPAN&gt;).alias(&lt;/SPAN&gt;&lt;SPAN&gt;"name"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; lit(None).alias(&lt;/SPAN&gt;&lt;SPAN&gt;"rackName"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; current_timestamp().alias(&lt;/SPAN&gt;&lt;SPAN&gt;"timestamp"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; # Return the latest snapshot version and the result DataFrame&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; return (latest_snapshot_version, result_df)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;dlt.create_auto_cdc_from_snapshot_flow(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; target = target_table,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; source = next_snapshot_and_version,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; keys = [&lt;/SPAN&gt;&lt;SPAN&gt;"name"&lt;/SPAN&gt;&lt;SPAN&gt;],&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; track_history_column_list = None,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; track_history_except_column_list = None&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;calculate a snaposhot and add them to target to make dlt do forceful delete&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;om.databricks.pipelines.common.errors.DLTSparkException: [&lt;/SPAN&gt;&lt;SPAN&gt;REFERENCE_DLT_DATASET_OUTSIDE_QUERY_DEFINITION&lt;/SPAN&gt;&lt;SPAN&gt;] Referencing DLT dataset `&lt;/SPAN&gt;&lt;SPAN&gt;700025&lt;/SPAN&gt;&lt;SPAN&gt;_ctg_dev`.`test_ramu`.`target` outside the dataset query definition (i.e., &lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/97035"&gt;@Dlt&lt;/a&gt;.table annotation) is not supported. Please read it instead inside the dataset query definition.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;getting this exception&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/DIV&gt;</description>
    <pubDate>Thu, 14 Aug 2025 07:03:56 GMT</pubDate>
    <dc:creator>Ramu1821</dc:creator>
    <dc:date>2025-08-14T07:03:56Z</dc:date>
    <item>
      <title>Merge using DLT</title>
      <link>https://community.databricks.com/t5/data-engineering/merge-using-dlt/m-p/128005#M48137</link>
      <description>&lt;P&gt;I have a requirement where i need only 24 hours data from my delta table&lt;BR /&gt;&lt;BR /&gt;lets call this as latest table&lt;BR /&gt;&lt;BR /&gt;this latest table should be in sync with source&lt;BR /&gt;so, it should handle all updates and inserts along with delete (if something gets deleted at source, then it should be deleted at target)&lt;BR /&gt;&lt;BR /&gt;any approach for this , we are looking for a solution in context of DLT&amp;nbsp;&lt;BR /&gt;i am using&amp;nbsp;&lt;SPAN&gt;create_auto_cdc but this is into taking care of deletes&amp;nbsp;&lt;BR /&gt;apply_as_deletes is only taking care of coming data, but not exisitng data&amp;nbsp;&lt;BR /&gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 11 Aug 2025 09:13:27 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/merge-using-dlt/m-p/128005#M48137</guid>
      <dc:creator>Ramu1821</dc:creator>
      <dc:date>2025-08-11T09:13:27Z</dc:date>
    </item>
    <item>
      <title>Re: Merge using DLT</title>
      <link>https://community.databricks.com/t5/data-engineering/merge-using-dlt/m-p/128414#M48229</link>
      <description>&lt;P&gt;Hi Ramu1821,&lt;/P&gt;&lt;P&gt;How are you doing today? as per my understanding, for your use case of keeping only the last 24 hours of data in sync—including inserts, updates, and deletes—within a Delta Live Tables (DLT) pipeline, you're right that create_streaming_live_table(...).create_auto_cdc() and apply_as_deletes = true help capture delete events only when they arrive, but they don’t clean up older data already in the table. A common approach here is to use a two-step logic: first, capture CDC events using apply_changes (or your custom logic) and write to a staging table; then, in your final “latest” table, filter only records where the timestamp is within the last 24 hours. To handle deletes for records that didn’t get new change events, you can run a scheduled batch process (or add logic in a DLT table with triggered mode) that compares timestamps and removes anything older than 24 hours. While DLT doesn’t offer full automatic “time-windowed retention” with delete tracking out-of-the-box, combining CDC, timestamp-based filtering, and a scheduled cleanup can get you close to your goal. Let me know if you'd like a code example to get started!&lt;/P&gt;&lt;P&gt;Regards,&lt;/P&gt;&lt;P&gt;Brahma&lt;/P&gt;</description>
      <pubDate>Thu, 14 Aug 2025 02:30:22 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/merge-using-dlt/m-p/128414#M48229</guid>
      <dc:creator>Brahmareddy</dc:creator>
      <dc:date>2025-08-14T02:30:22Z</dc:date>
    </item>
    <item>
      <title>Re: Merge using DLT</title>
      <link>https://community.databricks.com/t5/data-engineering/merge-using-dlt/m-p/128423#M48235</link>
      <description>&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from pyspark.sql.functions import col, lit, expr, when, to_timestamp, current_timestamp&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from pyspark.sql.functions import max as max_&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;import dlt&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from pyspark.sql.types import StructType, StructField, StringType&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from pyspark.sql.utils import AnalysisException&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from pyspark.sql.functions import col, struct, lit&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from typing import Optional, Tuple&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from pandas import DataFrame&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# Define the source table&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;source_table = &amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;"LIVE.source"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# Define the target table&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;target_table = &lt;/SPAN&gt;&lt;SPAN&gt;"target"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;dlt.create_streaming_table(target_table)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;schema = StructType([&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;StructField(&lt;/SPAN&gt;&lt;SPAN&gt;"name"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;StringType()&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;True)&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;StructField(&lt;/SPAN&gt;&lt;SPAN&gt;"rackName"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;StringType()&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;True)&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;StructField(&lt;/SPAN&gt;&lt;SPAN&gt;"clusterName"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;StringType()&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;True)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;])&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/97035"&gt;@Dlt&lt;/a&gt;.table(name=&lt;/SPAN&gt;&lt;SPAN&gt;"source_89939"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;def source_&lt;/SPAN&gt;&lt;SPAN&gt;89939&lt;/SPAN&gt;&lt;SPAN&gt;():&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; # Read from the source table&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; df = spark.readStream.format(&lt;/SPAN&gt;&lt;SPAN&gt;"delta"&lt;/SPAN&gt;&lt;SPAN&gt;).option(&lt;/SPAN&gt;&lt;SPAN&gt;"skipChangeCommits"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;).table(source_table)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; # Filter data from the last &lt;/SPAN&gt;&lt;SPAN&gt;24&lt;/SPAN&gt;&lt;SPAN&gt; hours&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; filtered_df = df.filter(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; (col(&lt;/SPAN&gt;&lt;SPAN&gt;"tenant"&lt;/SPAN&gt;&lt;SPAN&gt;) == lit(&lt;/SPAN&gt;&lt;SPAN&gt;"vdi"&lt;/SPAN&gt;&lt;SPAN&gt;)) &amp;amp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; (col(&lt;/SPAN&gt;&lt;SPAN&gt;"rack"&lt;/SPAN&gt;&lt;SPAN&gt;) != lit(&lt;/SPAN&gt;&lt;SPAN&gt;"unknown"&lt;/SPAN&gt;&lt;SPAN&gt;)) &amp;amp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; (to_timestamp(col(&lt;/SPAN&gt;&lt;SPAN&gt;"timestamp"&lt;/SPAN&gt;&lt;SPAN&gt;), &lt;/SPAN&gt;&lt;SPAN&gt;"yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS'Z'"&lt;/SPAN&gt;&lt;SPAN&gt;) &amp;gt;= expr(&lt;/SPAN&gt;&lt;SPAN&gt;"current_timestamp() - interval 24 hours"&lt;/SPAN&gt;&lt;SPAN&gt;))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; # Select the necessary columns&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; selected_df = filtered_df.select(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; col(&lt;/SPAN&gt;&lt;SPAN&gt;"name"&lt;/SPAN&gt;&lt;SPAN&gt;).alias(&lt;/SPAN&gt;&lt;SPAN&gt;"name"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; col(&lt;/SPAN&gt;&lt;SPAN&gt;"rack"&lt;/SPAN&gt;&lt;SPAN&gt;).alias(&lt;/SPAN&gt;&lt;SPAN&gt;"rackName"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; col(&lt;/SPAN&gt;&lt;SPAN&gt;"hosts"&lt;/SPAN&gt;&lt;SPAN&gt;).alias(&lt;/SPAN&gt;&lt;SPAN&gt;"hosts"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; to_timestamp(col(&lt;/SPAN&gt;&lt;SPAN&gt;"timestamp"&lt;/SPAN&gt;&lt;SPAN&gt;), &lt;/SPAN&gt;&lt;SPAN&gt;"yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS'Z'"&lt;/SPAN&gt;&lt;SPAN&gt;).alias(&lt;/SPAN&gt;&lt;SPAN&gt;"timestamp"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; return selected_df&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;dlt.apply_changes(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; target=target_table,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; source=&lt;/SPAN&gt;&lt;SPAN&gt;"source_89939"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; keys=[&lt;/SPAN&gt;&lt;SPAN&gt;"name"&lt;/SPAN&gt;&lt;SPAN&gt;],&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; sequence_by=col(&lt;/SPAN&gt;&lt;SPAN&gt;"timestamp"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; name=&lt;/SPAN&gt;&lt;SPAN&gt;"initial"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;def next_snapshot_and_version(latest_snapshot_version: Optional[&lt;/SPAN&gt;&lt;SPAN&gt;int&lt;/SPAN&gt;&lt;SPAN&gt;]) -&amp;gt; Tuple[&lt;/SPAN&gt;&lt;SPAN&gt;Optional&lt;/SPAN&gt;&lt;SPAN&gt;[&lt;/SPAN&gt;&lt;SPAN&gt;int&lt;/SPAN&gt;&lt;SPAN&gt;], &lt;/SPAN&gt;&lt;SPAN&gt;DataFrame&lt;/SPAN&gt;&lt;SPAN&gt;]:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; # Read from the source table&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; source_df = spark.read \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .format(&lt;/SPAN&gt;&lt;SPAN&gt;"delta"&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option(&lt;/SPAN&gt;&lt;SPAN&gt;"ignoreDeletes"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;) \&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .table(source_table)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; # Read from the target table using dlt&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; target_df =dlt.read(&lt;/SPAN&gt;&lt;SPAN&gt;"700025_ctg_dev.test_ramu.target"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; # Perform the join operation&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; result_df = target_df.alias(&lt;/SPAN&gt;&lt;SPAN&gt;"t"&lt;/SPAN&gt;&lt;SPAN&gt;).join(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; source_df.alias(&lt;/SPAN&gt;&lt;SPAN&gt;"s"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; on=&lt;/SPAN&gt;&lt;SPAN&gt;"name"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; how=&lt;/SPAN&gt;&lt;SPAN&gt;"left_anti"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; ).select(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; col(&lt;/SPAN&gt;&lt;SPAN&gt;"t.name"&lt;/SPAN&gt;&lt;SPAN&gt;).alias(&lt;/SPAN&gt;&lt;SPAN&gt;"name"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; lit(None).alias(&lt;/SPAN&gt;&lt;SPAN&gt;"rackName"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; current_timestamp().alias(&lt;/SPAN&gt;&lt;SPAN&gt;"timestamp"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; # Return the latest snapshot version and the result DataFrame&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; return (latest_snapshot_version, result_df)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;dlt.create_auto_cdc_from_snapshot_flow(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; target = target_table,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; source = next_snapshot_and_version,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; keys = [&lt;/SPAN&gt;&lt;SPAN&gt;"name"&lt;/SPAN&gt;&lt;SPAN&gt;],&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; track_history_column_list = None,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; track_history_except_column_list = None&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;calculate a snaposhot and add them to target to make dlt do forceful delete&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;om.databricks.pipelines.common.errors.DLTSparkException: [&lt;/SPAN&gt;&lt;SPAN&gt;REFERENCE_DLT_DATASET_OUTSIDE_QUERY_DEFINITION&lt;/SPAN&gt;&lt;SPAN&gt;] Referencing DLT dataset `&lt;/SPAN&gt;&lt;SPAN&gt;700025&lt;/SPAN&gt;&lt;SPAN&gt;_ctg_dev`.`test_ramu`.`target` outside the dataset query definition (i.e., &lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/97035"&gt;@Dlt&lt;/a&gt;.table annotation) is not supported. Please read it instead inside the dataset query definition.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;getting this exception&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/DIV&gt;</description>
      <pubDate>Thu, 14 Aug 2025 07:03:56 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/merge-using-dlt/m-p/128423#M48235</guid>
      <dc:creator>Ramu1821</dc:creator>
      <dc:date>2025-08-14T07:03:56Z</dc:date>
    </item>
  </channel>
</rss>

