<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Incremental join transformation using Delta live tables in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/incremental-join-transformation-using-delta-live-tables/m-p/64749#M32643</link>
    <description>&lt;P&gt;I do not use DLT, but it seems that what you want to achieve is not actually a direct merge on a target table&lt;BR /&gt;I think you want to combine the new incoming data only and append/merge that to the target table, correct?&lt;BR /&gt;If so, I would treat it like that: a join on the new data which is overwritten completely at each run, and this is appended to the target table.&lt;BR /&gt;Take&lt;/P&gt;</description>
    <pubDate>Wed, 27 Mar 2024 07:53:34 GMT</pubDate>
    <dc:creator>-werners-</dc:creator>
    <dc:date>2024-03-27T07:53:34Z</dc:date>
    <item>
      <title>Incremental join transformation using Delta live tables</title>
      <link>https://community.databricks.com/t5/data-engineering/incremental-join-transformation-using-delta-live-tables/m-p/64725#M32638</link>
      <description>&lt;P&gt;I'm attempting to build an incremental data processing pipeline using delta live tables. The aim to stream data from a source multiple times in a day and join the data within the specific increment only.&lt;/P&gt;&lt;P&gt;I'm using autoloader to load the data incrementally from source.&lt;/P&gt;&lt;P&gt;I've taken a sample of 2 tables which I'm attempting to join with each other and produce a gold table. When I run the pipeline for the first time, the tables are joined correctly and the count of records in the gold table is correct.&lt;/P&gt;&lt;P&gt;In the next step, I attempt to load 2 records each in the input tables and reran the entire pipeline. I was expecting the final table row count to be incremented by 2. However I found that the number was &amp;gt; 2. On closer inspection, I realized that even though only 2 records were streamed in for each of the input tables, the join occurred with the entire table.&lt;BR /&gt;Is there a way I can have only the delta of each file be joined with each other rather than have the join occur over the entire table?&lt;/P&gt;&lt;P&gt;The joining step looks like this.&lt;/P&gt;&lt;PRE&gt;def load_table_to_gold(sqlstmt,temp,outtable):  
    @dlt.table(name=f"GOLD_{outtable}",temporary=temp, comment=f"This is my gold table GOLD_{outtable}")
    def gold_cold():
        golddf = spark.sql("SELECT c.customer_id, c.name, c.mobile_number, o.order_id, o.total_amount, c.ingested_date FROM STREAM LIVE.SILVER_Cust AS c INNER JOIN STREAM LIVE.SILVER_Orders AS o ON c.customer_id = o.customer_id")
        return golddf&lt;/PRE&gt;&lt;P&gt;&lt;SPAN&gt;As you can see from the pic, it shows that the 2 incoming tables streamed in 2 records each, however, the join that occurred spat out 19 records into the Gold table when I was expecting 2.&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="pic.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/6829i3D1041A6E6D72A86/image-size/medium/is-moderation-mode/true?v=v2&amp;amp;px=400" role="button" title="pic.png" alt="pic.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt; &lt;/P&gt;</description>
      <pubDate>Wed, 27 Mar 2024 02:03:03 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/incremental-join-transformation-using-delta-live-tables/m-p/64725#M32638</guid>
      <dc:creator>Edthehead</dc:creator>
      <dc:date>2024-03-27T02:03:03Z</dc:date>
    </item>
    <item>
      <title>Re: Incremental join transformation using Delta live tables</title>
      <link>https://community.databricks.com/t5/data-engineering/incremental-join-transformation-using-delta-live-tables/m-p/64749#M32643</link>
      <description>&lt;P&gt;I do not use DLT, but it seems that what you want to achieve is not actually a direct merge on a target table&lt;BR /&gt;I think you want to combine the new incoming data only and append/merge that to the target table, correct?&lt;BR /&gt;If so, I would treat it like that: a join on the new data which is overwritten completely at each run, and this is appended to the target table.&lt;BR /&gt;Take&lt;/P&gt;</description>
      <pubDate>Wed, 27 Mar 2024 07:53:34 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/incremental-join-transformation-using-delta-live-tables/m-p/64749#M32643</guid>
      <dc:creator>-werners-</dc:creator>
      <dc:date>2024-03-27T07:53:34Z</dc:date>
    </item>
    <item>
      <title>Re: Incremental join transformation using Delta live tables</title>
      <link>https://community.databricks.com/t5/data-engineering/incremental-join-transformation-using-delta-live-tables/m-p/64751#M32644</link>
      <description>&lt;P&gt;Well yes, that is exactly what I would like to do. I want to do a join only within the new data and then append into the new tables. So if there are 2 new records in each input table, the append should also 2 new records. However, if I use streaming or DLT, the join is being done with the entire table and not just within the incremental new data. Can I achieve what I need with DLT? If not, what can I do using standard pyspark? Another option is using change data feed but I'm yet to explore that.&lt;/P&gt;</description>
      <pubDate>Wed, 27 Mar 2024 08:01:07 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/incremental-join-transformation-using-delta-live-tables/m-p/64751#M32644</guid>
      <dc:creator>Edthehead</dc:creator>
      <dc:date>2024-03-27T08:01:07Z</dc:date>
    </item>
    <item>
      <title>Re: Incremental join transformation using Delta live tables</title>
      <link>https://community.databricks.com/t5/data-engineering/incremental-join-transformation-using-delta-live-tables/m-p/64755#M32647</link>
      <description>&lt;P&gt;I suppose it is possible in dlt too, but in plain spark I would join the input tables into a dataframe, and that dataframe is then appended to the target table.&lt;/P&gt;</description>
      <pubDate>Wed, 27 Mar 2024 08:23:45 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/incremental-join-transformation-using-delta-live-tables/m-p/64755#M32647</guid>
      <dc:creator>-werners-</dc:creator>
      <dc:date>2024-03-27T08:23:45Z</dc:date>
    </item>
    <item>
      <title>Re: Incremental join transformation using Delta live tables</title>
      <link>https://community.databricks.com/t5/data-engineering/incremental-join-transformation-using-delta-live-tables/m-p/64765#M32652</link>
      <description>&lt;P&gt;When I try joining the 2 tables using streaming, the entire tables are joined with each other rather than just the incremental changes with each other. I'm really joining 2 tables which are being incremented in frequent intervals.&amp;nbsp;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 27 Mar 2024 08:56:31 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/incremental-join-transformation-using-delta-live-tables/m-p/64765#M32652</guid>
      <dc:creator>Edthehead</dc:creator>
      <dc:date>2024-03-27T08:56:31Z</dc:date>
    </item>
    <item>
      <title>Re: Incremental join transformation using Delta live tables</title>
      <link>https://community.databricks.com/t5/data-engineering/incremental-join-transformation-using-delta-live-tables/m-p/64767#M32654</link>
      <description>&lt;P&gt;basically you want to do a &lt;A href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#stream-stream-joins" target="_self"&gt;stream-stream join&lt;/A&gt;.&amp;nbsp; If you want to do that you need to take a few things into account (see link).&lt;BR /&gt;DLT might do this for you, but I never used it so I cannot confirm that.&lt;BR /&gt;If your source tables are delta tables, you could indeed use data feed.&lt;BR /&gt;Another option is to not use streaming and selecting the data from the source tables using a simple where condition.&lt;BR /&gt;Of course this will work but there is no management of state (what if a run has failed, what was the last update I did etc, unless you store some kind of watermark somewhere (without usign streams).&lt;/P&gt;</description>
      <pubDate>Wed, 27 Mar 2024 09:06:21 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/incremental-join-transformation-using-delta-live-tables/m-p/64767#M32654</guid>
      <dc:creator>-werners-</dc:creator>
      <dc:date>2024-03-27T09:06:21Z</dc:date>
    </item>
  </channel>
</rss>

