Getting error when using CDC in delta live table
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-20-2022 11:24 PM
Hi,
I am trying to use CDC for delta live table, and when when I run the pipeline second time I get an error :
org.apache.spark.sql.streaming.StreamingQueryException: Query tbl_cdc [id = ***-xx-xx-bf7e-6cb8b0deb690, runId = ***-xxxx-4031-ba74-b4b22be05774] terminated with exception: Detected a data update (for example part-00000-eedcf65d-3aa0.snappy.parquet) in the source table at version 2. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory.
- Labels:
-
Cdc
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-21-2022 02:40 AM
@Palzor Lama , Structured streaming supports only sources which append data. It seems that there is an UPDATE, MERGE INTO, DELETE or OVERWRITE operation on source.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-25-2022 03:41 PM
@Hubert Dudek , thanks for your answer, well we have files that we are loading and when we are running the pipeline for the new file that comes in then we get this error. So I think its an append rather than an update.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-14-2022 09:06 AM
Can you use the ignoreChanges when you read your stream? The code would look something like
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return (
spark.readStream
.format("delta")
.option("ignoreChanges", "true")
.table("cdc_data.users")
)
dlt.create_target_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"]
)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-01-2022 04:55 PM
Hi @Palzor Lama,
A streaming live table can only process append queries; that is, queries where new rows are inserted into the source table. Processing updates from source tables, for example, merges and deletes, is not supported. To process updates, see the APPLY CHANGES INTO command. You can do what @Chris Cary recommended. For more information, check the docs from here https://docs.databricks.com/data-engineering/delta-live-tables/delta-live-tables-cdc.html#apply-chan...

