Hi there! I'm pretty new to using Auto Loader, so this may be a really obvious fix, but it's stumped me for a few weeks, so I'm hoping someone can help!
I have a small csv file saved in ADLS with a list of pizzas for an imaginary pizza restaurant. I'm using Auto Loader to read this in to a delta table in my bronze layer. This all appears to be working as intended.
I've already created an empty delta table in the silver layer. I'm then using Auto Loader, to ingest the data from the bronze table into a dataframe, where I'll transform the data before writing it to the silver table.
I want to implement type 2 SCD, so I have an upsert_to_delta function, which I'll pass in to the foreachBatch method when writing to the silver table. I've seen a few methods for applying type 2 SCD, but the most applicable one combines the dataframe to itself with a union, and adds a merge ID column with half set to null and the other half set to the primary key - in this case, the pizza_id.
The challenge I'm having is that when I run the notebook, nothing is written to the silver table. I had got it working by using whenNotMatchedInsertAll without any conditions, but I need to only insert rows where the merge_id is null otherwise I get duplicate rows.
If anyone has any ideas of where I've gone wrong, I'd be eternally grateful!
from delta.tables import DeltaTable
def upsert_to_delta(input_df, batch_id):
updates_df = (input_df
.withColumn('merge_id', col('pizza_id'))
.union(input_df.withColumn('merge_id', lit(None)))
)
delta_table = DeltaTable.forName(spark, silver_table_name)
(delta_table
.alias('original')
.merge(updates_df.alias('updates'), 'original.pizza_id = updates.merge_id AND original.status = "a"')
.whenMatchedUpdate(
set={'original.end_date': current_date(), 'original.status': lit('i')}
)
.whenNotMatchedInsert(
condition='updates.merge_id IS NULL',
values = {
'original.pizza_id': 'updates.pizza_id',
'original.pizza_type_id': 'updates.pizza_type_id',
'original.size': 'updates.size',
'original.price': 'updates.price',
'original.start_date': 'updates.start_date',
'original.end_date': 'updates.end_date',
'original.status': 'updates.status'
}
)
.execute()
)