- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-20-2025 06:52 AM
Hi, i'm using the medallion architecture and the bronze (autoloader with outputmode append) has the full history. So i decided to user the silver zone to dedup the bronze using the 'change data feed'. But when i this to do the upsert i got the message:
[AMBIGUOUS_REFERENCE_TO_FIELDS] Ambiguous reference to the field `_change_type`. It appears 2 times in the schema.
Here is the code...
source_table = "prod_itsm.bronze_clientname.task"
target_table = "prod_itsm.silver_clientname.task"
checkpoint_relative_path = "/Volumes/prod_itsm/silver_clientname/databricks/autoloader/checkpoints/silver/itsm_clientname_task"
df = (
spark.readStream
.format('delta')
.option("readChangeFeed", "true")
.table(source_table)
)
def upsert_via_cdf(df, batch_id, target_table):
id_field = 'sys_id_value'
view_name = "view_" + target_table.replace('bronze', 'silver').replace('.', '_')
# ------------------------------------------
# Getting the modified records at bronze
# ------------------------------------------
df.createOrReplaceGlobalTempView(view_name)
query_cdf = f"""
SELECT *
FROM global_temp.{view_name}
WHERE _change_type <> 'update_preimage'
QUALIFY ROW_NUMBER() OVER (PARTITION BY {id_field} ORDER BY _commit_timestamp DESC, collect_date DESC) = 1
"""
df_upsert = spark.sql(query_cdf)
# --------------------------------------
# Merge Upsert the records found
# --------------------------------------
deltadf = DeltaTable.forName(spark, target_table)
(
deltadf.alias("target")
.merge(df_upsert.alias("source"), f"source.{id_field} = target.{id_field}")
.whenMatchedUpdateAll(condition = "source._change_type = 'update_postimage'")
.whenNotMatchedInsertAll(condition = "source._change_type = 'insert' OR source._change_type = 'update_postimage'")
.execute()
)
try:
stream_query = (
df.writeStream
.option("checkpointLocation", checkpoint_relative_path)
.foreachBatch(lambda batch_df, batch_id: upsert_via_cdf(batch_df, batch_id, target_table))
.trigger(availableNow=True)
).start()
#stream_query.awaitTermination()
except Exception as e:
raise RuntimeError(str(e))
So, what is the issue here? I presume that the reserved columns words should be ignored on upsert method and the global views there is no method 'change data feed' on it. So, should not be duplicate columns when it was created.
Those columns are not im my schema. So i cannot find the issue.
- Labels:
-
Delta Lake
-
Spark
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-22-2025 08:26 AM
How are you doing today? As per my understanding, The error happens because _change_type is appearing twice in your schema, likely due to how you're processing the Change Data Feed (CDF). This can happen if SELECT * is pulling in multiple copies of _change_type or if the global temp view is duplicating columns. To fix this, try explicitly selecting only the necessary columns in your query_cdf, check df_upsert.printSchema() to confirm duplicates, and make sure _change_type isn’t unintentionally added again in the merge conditions. Also, avoid using createOrReplaceGlobalTempView() if it's causing schema issues. Give these a shot and see if it helps!
Regards,
Brahma
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-22-2025 08:26 AM
How are you doing today? As per my understanding, The error happens because _change_type is appearing twice in your schema, likely due to how you're processing the Change Data Feed (CDF). This can happen if SELECT * is pulling in multiple copies of _change_type or if the global temp view is duplicating columns. To fix this, try explicitly selecting only the necessary columns in your query_cdf, check df_upsert.printSchema() to confirm duplicates, and make sure _change_type isn’t unintentionally added again in the merge conditions. Also, avoid using createOrReplaceGlobalTempView() if it's causing schema issues. Give these a shot and see if it helps!
Regards,
Brahma
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-24-2025 04:44 AM
Hi @Brahmareddy that's works.
def __remove_cdf_columns(df, exclude_columns = ['_change_type', '_commit_version', '_commit_timestamp']):
columns_list = df.columns
columns_list.sort()
update_columns = {}
for column_name in columns_list:
if column_name not in exclude_columns:
update_columns.update({f"target.{column_name}": f"source.{column_name}"})
return update_columns
def upsert_via_cdf(df, batch_id, target_table):
id_field = 'sys_id_value'
view_name = "view_" + target_table.replace('.', '_')
# ------------------------------------------
# Getting the modified records at bronze
# ------------------------------------------
df.createOrReplaceTempView(view_name)
query_cdf = f"""
SELECT *
FROM {view_name}
WHERE _change_type <> 'update_preimage'
QUALIFY ROW_NUMBER() OVER (PARTITION BY {id_field} ORDER BY _commit_timestamp DESC, collect_date DESC) = 1
"""
df_upsert = spark.sql(query_cdf)
# --------------------------------------
# Merge Upsert the records found
# --------------------------------------
columns = __remove_cdf_columns(df)
deltadf = DeltaTable.forName(spark, target_table)
(
deltadf.alias("target")
.merge(df_upsert.alias("source"), f"source.{id_field} = target.{id_field}")
.whenMatchedUpdate(set = columns, condition = F.col("source._change_type") == 'update_postimage')
.whenNotMatchedInsert(values = columns, condition = F.col("source._change_type") == 'insert')
.execute()
)
Thanks.

