Hello,
I am trying to read delta table in delta shares shared from other environments.
The pipeline runs okay; however, as the delta table is update in the source (delta share in GCP), the code below gets error, unless if I reset the checkpoint. I wonder if reading delta tables in delta share, I can keep the checkpoint to avoid the same data being write twice if pipeline is executed twice.
streaming_transactions = spark.readStream.format("delta") \
.option("cloudFiles.format", "deltaSharing") \
.table(f"{source_root_path}.{table_name}") \
.selectExpr("*", *metadata)
# 'mergeSchema' option enables schema evolution when writing
# 'readChangeFeed' option tells Delta Lake to read the change data from the Delta table, rather than the full data.
streaming_transactions.writeStream.format("delta") \
.partitionBy(f"retrieved_datetime") \
.trigger(availableNow=True) \
.option("checkpointLocation", checkpoint) \
.option("readChangeFeed", "true") \
.option("mergeSchema", "true") \
.toTable(
tableName=target_table_name,
format="delta",
outputMode="append",
path=target_path
)
com.databricks.sql.transaction.tahoe.DeltaUnsupportedOperationException: [DELTA_SOURCE_TABLE_IGNORE_CHANGES] Detected a data update (for example CREATE OR REPLACE TABLE AS SELECT (Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {"delta.enableDeletionVectors":"true"}, statsOnLoad -> false))) in the source table at version 8. This is currently not supported. If this is going to happen regularly and you are okay to skip changes, set the option 'skipChangeCommits' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory or do a full refresh if you are using DLT. If you need to handle these changes, please switch to MVs. The source table can be found at path gs://databricks.....