Can you use the ignoreChanges when you read your stream? The code would look something like
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return (
spark.readStream
.format("delta")
.option("ignoreChanges", "true")
.table("cdc_data.users")
)
dlt.create_target_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"]
)