Hey Folks, I'm trying to implement CDC - Apply changes from one delta table to another. Source is a delta table named table_latest and target is another delta table named table_old. Both are delta tables in databricks. Im trying to cascade the incremental changes from table_latest to table_old using DLT. Below is my code Im using:
import dlt
from pyspark.sql.functions import col
@dlt.table(name="source_table_dlt")
def source_table():
return (
spark.read.format("delta").table("table_latest")
)
@dlt.table(name="target_table_dlt")
def target_table():
return (
spark.read.format("delta").table("table_old")
)
dlt.apply_changes(
target = "target_table_dlt",
source = "source_table_dlt",
keys=["id"],
sequence_by= col("import_date"))
The source code seems to run successfully. But when I run the Delta Live Table pipeline I get the following error:
AnalysisException: Cannot have multiple queries named `target_table` for `target_table`. Additional queries on that table must be named. Note that unnamed queries default to the same name as the table.,None,Map(),Map(),List(),List(),Map()).
Am I missing something fundamental here ? Please help.