09-21-2024 06:41 AM - edited 09-21-2024 06:48 AM
Hey Folks, I'm trying to implement CDC - Apply changes from one delta table to another. Source is a delta table named table_latest and target is another delta table named table_old. Both are delta tables in databricks. Im trying to cascade the incremental changes from table_latest to table_old using DLT. Below is my code Im using:
09-22-2024 01:21 AM - edited 09-22-2024 01:39 AM
Hi @aniruth1000 ,
When using delta live table pipelines, only the source table can be the delta table.
The target table must be fully managed by the DLT pipeline, including its creation and lifecycle.
Let's say that you modified the code as suggested by @gchandra, and your code looks like below:
import dlt
from pyspark.sql.functions import col
@dlt.table(name="source_table_dlt")
def source_table():
return (
spark.read.format("delta").table("table_latest")
)
dlt.create_streaming_table("table_old")
dlt.apply_changes(
target = "target_old",
source = "source_table_dlt",
keys=["id"],
sequence_by= col("import_date"))
The requirement is that target is not an existing delta table not created by DLT.
If a table the given name (target_old) name already exists as a managed Delta table (not created and managed by DLT), DLT will throw an error because it cannot take over the management of an existing managed table not created by it. This is what is happening in your case.
How to solve it?
The requirements:
1. Your target table will be loaded with data from "table_latest" on a regular basis
2. Your target table must also contain data from "table_old"
The steps:
1. Create a dlt pipeline as above
2. Change the target table to a different table name, like "table_target"
3. Run a one-time data-backfill from table_old as described in the docs.
09-21-2024 03:39 PM
09-21-2024 09:51 PM
Hi, Thanks for your response - I tried doing the above and I assume my apply changes should look like:
09-22-2024 01:21 AM - edited 09-22-2024 01:39 AM
Hi @aniruth1000 ,
When using delta live table pipelines, only the source table can be the delta table.
The target table must be fully managed by the DLT pipeline, including its creation and lifecycle.
Let's say that you modified the code as suggested by @gchandra, and your code looks like below:
import dlt
from pyspark.sql.functions import col
@dlt.table(name="source_table_dlt")
def source_table():
return (
spark.read.format("delta").table("table_latest")
)
dlt.create_streaming_table("table_old")
dlt.apply_changes(
target = "target_old",
source = "source_table_dlt",
keys=["id"],
sequence_by= col("import_date"))
The requirement is that target is not an existing delta table not created by DLT.
If a table the given name (target_old) name already exists as a managed Delta table (not created and managed by DLT), DLT will throw an error because it cannot take over the management of an existing managed table not created by it. This is what is happening in your case.
How to solve it?
The requirements:
1. Your target table will be loaded with data from "table_latest" on a regular basis
2. Your target table must also contain data from "table_old"
The steps:
1. Create a dlt pipeline as above
2. Change the target table to a different table name, like "table_target"
3. Run a one-time data-backfill from table_old as described in the docs.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group