cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Type 2 SCD when using Auto Loader

matt_stanford
New Contributor III

Hi there! I'm pretty new to using Auto Loader, so this may be a really obvious fix, but it's stumped me for a few weeks, so I'm hoping someone can help! 

I have a small csv file saved in ADLS with a list of pizzas for an imaginary pizza restaurant. I'm using Auto Loader to read this in to a delta table in my bronze layer. This all appears to be working as intended.

I've already created an empty delta table in the silver layer. I'm then using Auto Loader, to ingest the data from the bronze table into a dataframe, where I'll transform the data before writing it to the silver table. 

I want to implement type 2 SCD, so I have an upsert_to_delta function, which I'll pass in to the foreachBatch method when writing to the silver table. I've seen a few methods for applying type 2 SCD, but the most applicable one combines the dataframe to itself with a union, and adds a merge ID column with half set to null and the other half set to the primary key - in this case, the pizza_id.

The challenge I'm having is that when I run the notebook, nothing is written to the silver table. I had got it working by using whenNotMatchedInsertAll without any conditions, but I need to only insert rows where the merge_id is null otherwise I get duplicate rows.

If anyone has any ideas of where I've gone wrong, I'd be eternally grateful! 

 

 

from delta.tables import DeltaTable

def upsert_to_delta(input_df, batch_id):

    updates_df = (input_df
        .withColumn('merge_id', col('pizza_id'))
        .union(input_df.withColumn('merge_id', lit(None)))
    )

    delta_table = DeltaTable.forName(spark, silver_table_name)
    (delta_table
        .alias('original')
        .merge(updates_df.alias('updates'), 'original.pizza_id = updates.merge_id AND original.status = "a"')
        .whenMatchedUpdate(
            set={'original.end_date': current_date(), 'original.status': lit('i')}
        )
        .whenNotMatchedInsert(
            condition='updates.merge_id IS NULL',
            values = {
                'original.pizza_id': 'updates.pizza_id',
                'original.pizza_type_id': 'updates.pizza_type_id',
                'original.size': 'updates.size',
                'original.price': 'updates.price',
                'original.start_date': 'updates.start_date',
                'original.end_date': 'updates.end_date',
                'original.status': 'updates.status'
            }
        )
        .execute()
    )

 

 

1 ACCEPTED SOLUTION

Accepted Solutions

matt_stanford
New Contributor III

So, I figured out what the issue was. I needed to delete checkpoint folder. After I did this and re-ran the notebook, everything worked fine! 

View solution in original post

2 REPLIES 2

matt_stanford
New Contributor III

So, I figured out what the issue was. I needed to delete checkpoint folder. After I did this and re-ran the notebook, everything worked fine! 

Kaniz
Community Manager
Community Manager

That’s a great tip! Thank you for sharing your knowledge with the community. I’m sure many people will find it helpful. Keep up the good work! 😊

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.