cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Auto loader from tables in Delta Share

dbuenosilva
New Contributor

Hello,

I am trying to read delta table in delta shares shared from other environments.

The pipeline runs okay; however, as the delta table is update in the source (delta share in GCP), the code below gets error, unless if I reset the checkpoint. I wonder if reading delta tables in delta share, I can keep the checkpoint to avoid the same data being write twice if pipeline is executed twice.

        streaming_transactions = spark.readStream.format("delta") \
                .option("cloudFiles.format", "deltaSharing") \
                .table(f"{source_root_path}.{table_name}") \
                .selectExpr("*", *metadata)        

        # 'mergeSchema' option enables schema evolution when writing
        # 'readChangeFeed' option tells Delta Lake to read the change data from the Delta table, rather than the full data.

        streaming_transactions.writeStream.format("delta") \
            .partitionBy(f"retrieved_datetime") \
            .trigger(availableNow=True) \
            .option("checkpointLocation", checkpoint) \
            .option("readChangeFeed", "true") \
            .option("mergeSchema", "true") \
            .toTable(
                    tableName=target_table_name,
                    format="delta",
                    outputMode="append",
                    path=target_path
            )


com.databricks.sql.transaction.tahoe.DeltaUnsupportedOperationException: [DELTA_SOURCE_TABLE_IGNORE_CHANGES] Detected a data update (for example CREATE OR REPLACE TABLE AS SELECT (Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {"delta.enableDeletionVectors":"true"}, statsOnLoad -> false))) in the source table at version 8. This is currently not supported. If this is going to happen regularly and you are okay to skip changes, set the option 'skipChangeCommits' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory or do a full refresh if you are using DLT. If you need to handle these changes, please switch to MVs. The source table can be found at path gs://databricks.....




2 REPLIES 2

NandiniN
Databricks Employee
Databricks Employee

Checking.

NandiniN
Databricks Employee
Databricks Employee

The error you are encountering—DeltaUnsupportedOperationException: [DELTA_SOURCE_TABLE_IGNORE_CHANGES]—occurs because your streaming job detects updates in the source Delta table, which is not supported for they type of source you have. Streaming table is append only.

This error is triggered when the source table undergoes data updates (such as CREATE OR REPLACE or UPDATE), and the streaming process doesn't know how to handle them.

As mentioned in the error message this operation is not supported, and you can either skip these operations if you do not want the stream to be affected by these actions using .option("skipChangeCommits", "true"). Note - By enabling skipChangeCommits, you might miss changes made to existing records in the source table. Downstream systems should be designed to handle such cases if necessary

Another suggestion: If data updates in the source table are regular and need to bepropagated downstream, converting the source table access pattern to use Materialized Views is recommended. This ensures updates are handled flexibly, and the downstream system can efficiently process changes.

 

You can check DESCRIBE HISTORY delta.`<table_path>` to check the operation at version 8 in source table to understand further.

 

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now