You can define the CDC flow to update the streaming table. This involves reading from the same table and applying changes.
@Dlt.table(
name="my_streaming_table",
comment="This table is updated using CDC",
table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("valid_data", "column_name IS NOT NULL")
def update_table():
source_df = dlt.read_stream("my_streaming_table")
changes_df = source_df.filter("change_type = 'update'")
return changes_df
Ensure your pipeline is configured to use the CDC functionality. This can be done in the pipeline settings.