cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Getting error when using CDC in delta live table

palzor
New Contributor III

Hi,

I am trying to use CDC for delta live table, and when when I run the pipeline second time I get an error :

org.apache.spark.sql.streaming.StreamingQueryException: Query tbl_cdc [id = ***-xx-xx-bf7e-6cb8b0deb690, runId = ***-xxxx-4031-ba74-b4b22be05774] terminated with exception: Detected a data update (for example part-00000-eedcf65d-3aa0.snappy.parquet) in the source table at version 2. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory.

5 REPLIES 5

Hubert-Dudek
Esteemed Contributor III

@Palzor Lama​ , Structured streaming supports only sources which append data. It seems that there is an UPDATE, MERGE INTO, DELETE or OVERWRITE operation on source.

Hi @Palzor Lama​ , Please let us know if this helps, or we'll find another answer for you.

palzor
New Contributor III

@Hubert Dudek​ , thanks for your answer, well we have files that we are loading and when we are running the pipeline for the new file that comes in then we get this error. So I think its an append rather than an update.

ccary
New Contributor III

Can you use the ignoreChanges when you read your stream? The code would look something like

import dlt
from pyspark.sql.functions import col, expr
 
@dlt.view
def users():
    return (
        spark.readStream
        .format("delta")
        .option("ignoreChanges", "true")
        .table("cdc_data.users")
)
 
dlt.create_target_table("target")
 
dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"]
)

jose_gonzalez
Moderator
Moderator

Hi @Palzor Lama​,

A streaming live table can only process append queries; that is, queries where new rows are inserted into the source table. Processing updates from source tables, for example, merges and deletes, is not supported. To process updates, see the APPLY CHANGES INTO command. You can do what @Chris Cary​ recommended. For more information, check the docs from here https://docs.databricks.com/data-engineering/delta-live-tables/delta-live-tables-cdc.html#apply-chan...

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.