<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Type 2 SCD when using Auto Loader in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/type-2-scd-when-using-auto-loader/m-p/59238#M31345</link>
    <description>&lt;P&gt;Hi there! I'm pretty new to using Auto Loader, so this may be a really obvious fix, but it's stumped me for a few weeks, so I'm hoping someone can help!&amp;nbsp;&lt;/P&gt;&lt;P&gt;I have a small csv file saved in ADLS with a list of pizzas for an imaginary pizza restaurant. I'm using Auto Loader to read this in to a delta table in my bronze layer. This all appears to be working as intended.&lt;/P&gt;&lt;P&gt;I've already created an empty delta table in the silver layer. I'm then using Auto Loader, to ingest the data from the bronze table into a dataframe, where I'll transform the data before writing it to the silver table.&amp;nbsp;&lt;/P&gt;&lt;P&gt;I want to implement type 2 SCD, so I have an &lt;EM&gt;upsert_to_delta&lt;/EM&gt; function, which I'll pass in to the foreachBatch method when writing to the silver table. I've seen a few methods for applying type 2 SCD, but the most applicable one combines the dataframe to itself with a union, and adds a merge ID column with half set to null and the other half set to the primary key - in this case, the pizza_id.&lt;/P&gt;&lt;P&gt;The challenge I'm having is that when I run the notebook, nothing is written to the silver table. I had got it working by using &lt;EM&gt;whenNotMatchedInsertAll&lt;/EM&gt; without any conditions, but I need to only insert rows where the &lt;EM&gt;merge_id&lt;/EM&gt; is null otherwise I get duplicate rows.&lt;/P&gt;&lt;P&gt;If anyone has any ideas of where I've gone wrong, I'd be eternally grateful!&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from delta.tables import DeltaTable

def upsert_to_delta(input_df, batch_id):

    updates_df = (input_df
        .withColumn('merge_id', col('pizza_id'))
        .union(input_df.withColumn('merge_id', lit(None)))
    )

    delta_table = DeltaTable.forName(spark, silver_table_name)
    (delta_table
        .alias('original')
        .merge(updates_df.alias('updates'), 'original.pizza_id = updates.merge_id AND original.status = "a"')
        .whenMatchedUpdate(
            set={'original.end_date': current_date(), 'original.status': lit('i')}
        )
        .whenNotMatchedInsert(
            condition='updates.merge_id IS NULL',
            values = {
                'original.pizza_id': 'updates.pizza_id',
                'original.pizza_type_id': 'updates.pizza_type_id',
                'original.size': 'updates.size',
                'original.price': 'updates.price',
                'original.start_date': 'updates.start_date',
                'original.end_date': 'updates.end_date',
                'original.status': 'updates.status'
            }
        )
        .execute()
    )&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Sun, 04 Feb 2024 12:35:52 GMT</pubDate>
    <dc:creator>matt_stanford</dc:creator>
    <dc:date>2024-02-04T12:35:52Z</dc:date>
    <item>
      <title>Type 2 SCD when using Auto Loader</title>
      <link>https://community.databricks.com/t5/data-engineering/type-2-scd-when-using-auto-loader/m-p/59238#M31345</link>
      <description>&lt;P&gt;Hi there! I'm pretty new to using Auto Loader, so this may be a really obvious fix, but it's stumped me for a few weeks, so I'm hoping someone can help!&amp;nbsp;&lt;/P&gt;&lt;P&gt;I have a small csv file saved in ADLS with a list of pizzas for an imaginary pizza restaurant. I'm using Auto Loader to read this in to a delta table in my bronze layer. This all appears to be working as intended.&lt;/P&gt;&lt;P&gt;I've already created an empty delta table in the silver layer. I'm then using Auto Loader, to ingest the data from the bronze table into a dataframe, where I'll transform the data before writing it to the silver table.&amp;nbsp;&lt;/P&gt;&lt;P&gt;I want to implement type 2 SCD, so I have an &lt;EM&gt;upsert_to_delta&lt;/EM&gt; function, which I'll pass in to the foreachBatch method when writing to the silver table. I've seen a few methods for applying type 2 SCD, but the most applicable one combines the dataframe to itself with a union, and adds a merge ID column with half set to null and the other half set to the primary key - in this case, the pizza_id.&lt;/P&gt;&lt;P&gt;The challenge I'm having is that when I run the notebook, nothing is written to the silver table. I had got it working by using &lt;EM&gt;whenNotMatchedInsertAll&lt;/EM&gt; without any conditions, but I need to only insert rows where the &lt;EM&gt;merge_id&lt;/EM&gt; is null otherwise I get duplicate rows.&lt;/P&gt;&lt;P&gt;If anyone has any ideas of where I've gone wrong, I'd be eternally grateful!&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from delta.tables import DeltaTable

def upsert_to_delta(input_df, batch_id):

    updates_df = (input_df
        .withColumn('merge_id', col('pizza_id'))
        .union(input_df.withColumn('merge_id', lit(None)))
    )

    delta_table = DeltaTable.forName(spark, silver_table_name)
    (delta_table
        .alias('original')
        .merge(updates_df.alias('updates'), 'original.pizza_id = updates.merge_id AND original.status = "a"')
        .whenMatchedUpdate(
            set={'original.end_date': current_date(), 'original.status': lit('i')}
        )
        .whenNotMatchedInsert(
            condition='updates.merge_id IS NULL',
            values = {
                'original.pizza_id': 'updates.pizza_id',
                'original.pizza_type_id': 'updates.pizza_type_id',
                'original.size': 'updates.size',
                'original.price': 'updates.price',
                'original.start_date': 'updates.start_date',
                'original.end_date': 'updates.end_date',
                'original.status': 'updates.status'
            }
        )
        .execute()
    )&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Sun, 04 Feb 2024 12:35:52 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/type-2-scd-when-using-auto-loader/m-p/59238#M31345</guid>
      <dc:creator>matt_stanford</dc:creator>
      <dc:date>2024-02-04T12:35:52Z</dc:date>
    </item>
    <item>
      <title>Re: Type 2 SCD when using Auto Loader</title>
      <link>https://community.databricks.com/t5/data-engineering/type-2-scd-when-using-auto-loader/m-p/59258#M31352</link>
      <description>&lt;P&gt;So, I figured out what the issue was. I needed to delete checkpoint folder. After I did this and re-ran the notebook, everything worked fine!&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 05 Feb 2024 08:16:00 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/type-2-scd-when-using-auto-loader/m-p/59258#M31352</guid>
      <dc:creator>matt_stanford</dc:creator>
      <dc:date>2024-02-05T08:16:00Z</dc:date>
    </item>
  </channel>
</rss>

