cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Type 2 SCD when using Auto Loader

matt_stanford
New Contributor III

Hi there! I'm pretty new to using Auto Loader, so this may be a really obvious fix, but it's stumped me for a few weeks, so I'm hoping someone can help! 

I have a small csv file saved in ADLS with a list of pizzas for an imaginary pizza restaurant. I'm using Auto Loader to read this in to a delta table in my bronze layer. This all appears to be working as intended.

I've already created an empty delta table in the silver layer. I'm then using Auto Loader, to ingest the data from the bronze table into a dataframe, where I'll transform the data before writing it to the silver table. 

I want to implement type 2 SCD, so I have an upsert_to_delta function, which I'll pass in to the foreachBatch method when writing to the silver table. I've seen a few methods for applying type 2 SCD, but the most applicable one combines the dataframe to itself with a union, and adds a merge ID column with half set to null and the other half set to the primary key - in this case, the pizza_id.

The challenge I'm having is that when I run the notebook, nothing is written to the silver table. I had got it working by using whenNotMatchedInsertAll without any conditions, but I need to only insert rows where the merge_id is null otherwise I get duplicate rows.

If anyone has any ideas of where I've gone wrong, I'd be eternally grateful! 

 

 

from delta.tables import DeltaTable

def upsert_to_delta(input_df, batch_id):

    updates_df = (input_df
        .withColumn('merge_id', col('pizza_id'))
        .union(input_df.withColumn('merge_id', lit(None)))
    )

    delta_table = DeltaTable.forName(spark, silver_table_name)
    (delta_table
        .alias('original')
        .merge(updates_df.alias('updates'), 'original.pizza_id = updates.merge_id AND original.status = "a"')
        .whenMatchedUpdate(
            set={'original.end_date': current_date(), 'original.status': lit('i')}
        )
        .whenNotMatchedInsert(
            condition='updates.merge_id IS NULL',
            values = {
                'original.pizza_id': 'updates.pizza_id',
                'original.pizza_type_id': 'updates.pizza_type_id',
                'original.size': 'updates.size',
                'original.price': 'updates.price',
                'original.start_date': 'updates.start_date',
                'original.end_date': 'updates.end_date',
                'original.status': 'updates.status'
            }
        )
        .execute()
    )

 

 

1 ACCEPTED SOLUTION

Accepted Solutions

matt_stanford
New Contributor III

So, I figured out what the issue was. I needed to delete checkpoint folder. After I did this and re-ran the notebook, everything worked fine! 

View solution in original post

1 REPLY 1

matt_stanford
New Contributor III

So, I figured out what the issue was. I needed to delete checkpoint folder. After I did this and re-ran the notebook, everything worked fine! 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group