cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Autoloader with cdf not ignoring reserved columns

rafaelcavalcant
New Contributor II

Hi, i'm using the medallion architecture and the bronze (autoloader with outputmode append) has the full history. So i decided to user the silver zone to dedup the bronze using the 'change data feed'. But when i this to do the upsert i got the message: 

[AMBIGUOUS_REFERENCE_TO_FIELDS] Ambiguous reference to the field `_change_type`. It appears 2 times in the schema.

Here is the code...

 

source_table = "prod_itsm.bronze_clientname.task"
target_table = "prod_itsm.silver_clientname.task"
checkpoint_relative_path = "/Volumes/prod_itsm/silver_clientname/databricks/autoloader/checkpoints/silver/itsm_clientname_task"
df = (
    spark.readStream
    .format('delta')
    .option("readChangeFeed", "true")
    .table(source_table)
)
def upsert_via_cdf(df, batch_id, target_table):

    id_field = 'sys_id_value'
    view_name = "view_" + target_table.replace('bronze', 'silver').replace('.', '_')

    # ------------------------------------------
    # Getting the modified records at bronze
    # ------------------------------------------
    df.createOrReplaceGlobalTempView(view_name)
    query_cdf = f"""
        SELECT *
        FROM global_temp.{view_name}
        WHERE _change_type <> 'update_preimage'
        QUALIFY ROW_NUMBER() OVER (PARTITION BY {id_field} ORDER BY _commit_timestamp DESC, collect_date DESC) = 1
    """
    df_upsert = spark.sql(query_cdf)

    # --------------------------------------
    # Merge Upsert the records found
    # --------------------------------------
    deltadf = DeltaTable.forName(spark, target_table)
    (
        deltadf.alias("target")
        .merge(df_upsert.alias("source"), f"source.{id_field} = target.{id_field}") 
        .whenMatchedUpdateAll(condition = "source._change_type = 'update_postimage'")
        .whenNotMatchedInsertAll(condition = "source._change_type = 'insert' OR source._change_type = 'update_postimage'")
        .execute()
    )
try:
    stream_query = (
        df.writeStream
        .option("checkpointLocation", checkpoint_relative_path)
        .foreachBatch(lambda batch_df, batch_id: upsert_via_cdf(batch_df, batch_id, target_table))
        .trigger(availableNow=True)
    ).start()
    #stream_query.awaitTermination()
except Exception as e:
  raise RuntimeError(str(e))

 

 So, what is the issue here? I presume that the reserved columns words should be ignored on upsert method and the global views there is no method 'change data feed' on it. So, should not be duplicate columns when it was created. 

rafaelcavalcant_0-1740063060487.png

Those columns are not im my schema. So i cannot find the issue.

 

 

1 ACCEPTED SOLUTION

Accepted Solutions

Brahmareddy
Honored Contributor II

Hi @rafaelcavalcant 

How are you doing today? As per my understanding, The error happens because _change_type is appearing twice in your schema, likely due to how you're processing the Change Data Feed (CDF). This can happen if SELECT * is pulling in multiple copies of _change_type or if the global temp view is duplicating columns. To fix this, try explicitly selecting only the necessary columns in your query_cdf, check df_upsert.printSchema() to confirm duplicates, and make sure _change_type isn’t unintentionally added again in the merge conditions. Also, avoid using createOrReplaceGlobalTempView() if it's causing schema issues. Give these a shot and see if it helps!

Regards,

Brahma

View solution in original post

2 REPLIES 2

Brahmareddy
Honored Contributor II

Hi @rafaelcavalcant 

How are you doing today? As per my understanding, The error happens because _change_type is appearing twice in your schema, likely due to how you're processing the Change Data Feed (CDF). This can happen if SELECT * is pulling in multiple copies of _change_type or if the global temp view is duplicating columns. To fix this, try explicitly selecting only the necessary columns in your query_cdf, check df_upsert.printSchema() to confirm duplicates, and make sure _change_type isn’t unintentionally added again in the merge conditions. Also, avoid using createOrReplaceGlobalTempView() if it's causing schema issues. Give these a shot and see if it helps!

Regards,

Brahma

Hi @Brahmareddy that's works.

 

def __remove_cdf_columns(df, exclude_columns = ['_change_type', '_commit_version', '_commit_timestamp']):

    columns_list = df.columns
    columns_list.sort()

    update_columns = {} 
    for column_name in columns_list: 
        if column_name not in exclude_columns:
            update_columns.update({f"target.{column_name}": f"source.{column_name}"})

    return update_columns

def upsert_via_cdf(df, batch_id, target_table):

    id_field = 'sys_id_value'
    view_name = "view_" + target_table.replace('.', '_')

    # ------------------------------------------
    # Getting the modified records at bronze
    # ------------------------------------------
    df.createOrReplaceTempView(view_name)
    query_cdf = f"""
        SELECT *
        FROM {view_name}
        WHERE _change_type <> 'update_preimage'
        QUALIFY ROW_NUMBER() OVER (PARTITION BY {id_field} ORDER BY _commit_timestamp DESC, collect_date DESC) = 1
    """
    df_upsert = spark.sql(query_cdf)

    # --------------------------------------
    # Merge Upsert the records found
    # --------------------------------------
    columns = __remove_cdf_columns(df)

    deltadf = DeltaTable.forName(spark, target_table)
    (
        deltadf.alias("target")
        .merge(df_upsert.alias("source"), f"source.{id_field} = target.{id_field}") 
        .whenMatchedUpdate(set = columns, condition = F.col("source._change_type") == 'update_postimage')
        .whenNotMatchedInsert(values = columns, condition = F.col("source._change_type") == 'insert')
        .execute()
    )

Thanks.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now