Hi, i'm using the medallion architecture and the bronze (autoloader with outputmode append) has the full history. So i decided to user the silver zone to dedup the bronze using the 'change data feed'. But when i this to do the upsert i got the message:
[AMBIGUOUS_REFERENCE_TO_FIELDS] Ambiguous reference to the field `_change_type`. It appears 2 times in the schema.
Here is the code...
source_table = "prod_itsm.bronze_clientname.task"
target_table = "prod_itsm.silver_clientname.task"
checkpoint_relative_path = "/Volumes/prod_itsm/silver_clientname/databricks/autoloader/checkpoints/silver/itsm_clientname_task"
df = (
spark.readStream
.format('delta')
.option("readChangeFeed", "true")
.table(source_table)
)
def upsert_via_cdf(df, batch_id, target_table):
id_field = 'sys_id_value'
view_name = "view_" + target_table.replace('bronze', 'silver').replace('.', '_')
# ------------------------------------------
# Getting the modified records at bronze
# ------------------------------------------
df.createOrReplaceGlobalTempView(view_name)
query_cdf = f"""
SELECT *
FROM global_temp.{view_name}
WHERE _change_type <> 'update_preimage'
QUALIFY ROW_NUMBER() OVER (PARTITION BY {id_field} ORDER BY _commit_timestamp DESC, collect_date DESC) = 1
"""
df_upsert = spark.sql(query_cdf)
# --------------------------------------
# Merge Upsert the records found
# --------------------------------------
deltadf = DeltaTable.forName(spark, target_table)
(
deltadf.alias("target")
.merge(df_upsert.alias("source"), f"source.{id_field} = target.{id_field}")
.whenMatchedUpdateAll(condition = "source._change_type = 'update_postimage'")
.whenNotMatchedInsertAll(condition = "source._change_type = 'insert' OR source._change_type = 'update_postimage'")
.execute()
)
try:
stream_query = (
df.writeStream
.option("checkpointLocation", checkpoint_relative_path)
.foreachBatch(lambda batch_df, batch_id: upsert_via_cdf(batch_df, batch_id, target_table))
.trigger(availableNow=True)
).start()
#stream_query.awaitTermination()
except Exception as e:
raise RuntimeError(str(e))
So, what is the issue here? I presume that the reserved columns words should be ignored on upsert method and the global views there is no method 'change data feed' on it. So, should not be duplicate columns when it was created.

Those columns are not im my schema. So i cannot find the issue.