cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Getting error when using CDC in delta live table

palzor
New Contributor III

Hi,

I am trying to use CDC for delta live table, and when when I run the pipeline second time I get an error :

org.apache.spark.sql.streaming.StreamingQueryException: Query tbl_cdc [id = ***-xx-xx-bf7e-6cb8b0deb690, runId = ***-xxxx-4031-ba74-b4b22be05774] terminated with exception: Detected a data update (for example part-00000-eedcf65d-3aa0.snappy.parquet) in the source table at version 2. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory.

4 REPLIES 4

Hubert-Dudek
Esteemed Contributor III

@Palzor Lamaโ€‹ , Structured streaming supports only sources which append data. It seems that there is an UPDATE, MERGE INTO, DELETE or OVERWRITE operation on source.

palzor
New Contributor III

@Hubert Dudekโ€‹ , thanks for your answer, well we have files that we are loading and when we are running the pipeline for the new file that comes in then we get this error. So I think its an append rather than an update.

ccary
New Contributor III

Can you use the ignoreChanges when you read your stream? The code would look something like

import dlt
from pyspark.sql.functions import col, expr
 
@dlt.view
def users():
    return (
        spark.readStream
        .format("delta")
        .option("ignoreChanges", "true")
        .table("cdc_data.users")
)
 
dlt.create_target_table("target")
 
dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"]
)

jose_gonzalez
Databricks Employee
Databricks Employee

Hi @Palzor Lamaโ€‹,

A streaming live table can only process append queries; that is, queries where new rows are inserted into the source table. Processing updates from source tables, for example, merges and deletes, is not supported. To process updates, see the APPLY CHANGES INTO command. You can do what @Chris Caryโ€‹ recommended. For more information, check the docs from here https://docs.databricks.com/data-engineering/delta-live-tables/delta-live-tables-cdc.html#apply-chan...

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group