cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Delta Live Tables - CDC - Batching - Delta Tables

aniruth1000
New Contributor II

Hey Folks, I'm trying to implement CDC - Apply changes from one delta table to another. Source is  a delta table named table_latest and target is another delta table named table_old. Both are delta tables in databricks. Im trying to cascade the incremental changes from table_latest to table_old using DLT.  Below is my code Im using:

 

import dlt
from pyspark.sql.functions import col


@dlt.table(name="source_table_dlt")
def source_table():
    return (
        spark.read.format("delta").table("table_latest")
    )

@dlt.table(name="target_table_dlt")
def target_table():
    return (
        spark.read.format("delta").table("table_old")
    )


dlt.apply_changes(
    target = "target_table_dlt",
    source = "source_table_dlt",
    keys=["id"],
    sequence_by= col("import_date"))


The source code seems to run successfully. But when I run the Delta Live Table  pipeline I get the following error: 

AnalysisException: Cannot have multiple queries named `target_table` for `target_table`. Additional queries on that table must be named. Note that unnamed queries default to the same name as the table.,None,Map(),Map(),List(),List(),Map()).

Am I missing something fundamental here ? Please help.
1 ACCEPTED SOLUTION

Accepted Solutions

filipniziol
Contributor

Hi @aniruth1000 ,

When using delta live table pipelines, only the source table can be the delta table.
The target table must be fully managed by the DLT pipeline, including its creation and lifecycle.

Let's say that you modified the code as suggested by @gchandra, and your code looks like below:

 

import dlt
from pyspark.sql.functions import col


@dlt.table(name="source_table_dlt")
def source_table():
    return (
        spark.read.format("delta").table("table_latest")
    )

dlt.create_streaming_table("table_old")

dlt.apply_changes(
    target = "target_old",
    source = "source_table_dlt",
    keys=["id"],
    sequence_by= col("import_date"))

 

The requirement is that target is not an existing delta table not created by DLT.

If a table the given name (target_old) name already exists as a managed Delta table (not created and managed by DLT), DLT will throw an error because it cannot take over the management of an existing managed table not created by it. This is what is happening in your case.

How to solve it?

The requirements:

1. Your target table will be loaded with data from "table_latest" on a regular basis

2. Your target table must also contain data from "table_old"

The steps:

1. Create a dlt pipeline as above

2. Change the target table to a different table name, like "table_target"

3. Run a one-time data-backfill from table_old as described in the docs.

 

View solution in original post

3 REPLIES 3

gchandra
Databricks Employee
Databricks Employee
Replace the target block with this and test
 
dlt.create_streaming_table("table_old")


~

Hi,  Thanks for your response - I tried doing the above and I assume my apply changes should  look like: 

dlt.apply_changes(
    target = "table_old", #<-- this is the actual delta table 
    source = "source_table_dlt",
    keys=["id"],
    sequence_by= col("import_date"))
 
When I run the pipeline, I get the following error: 

com.databricks.pipelines.common.errors.DLTAnalysisException: Could not materialize 'table_old' because a MANAGED table already exists with that name.
 
Am I getting this because table_old is a delta table and not a delta live table ? Appreciate any help in resolving this. TIA.
 


filipniziol
Contributor

Hi @aniruth1000 ,

When using delta live table pipelines, only the source table can be the delta table.
The target table must be fully managed by the DLT pipeline, including its creation and lifecycle.

Let's say that you modified the code as suggested by @gchandra, and your code looks like below:

 

import dlt
from pyspark.sql.functions import col


@dlt.table(name="source_table_dlt")
def source_table():
    return (
        spark.read.format("delta").table("table_latest")
    )

dlt.create_streaming_table("table_old")

dlt.apply_changes(
    target = "target_old",
    source = "source_table_dlt",
    keys=["id"],
    sequence_by= col("import_date"))

 

The requirement is that target is not an existing delta table not created by DLT.

If a table the given name (target_old) name already exists as a managed Delta table (not created and managed by DLT), DLT will throw an error because it cannot take over the management of an existing managed table not created by it. This is what is happening in your case.

How to solve it?

The requirements:

1. Your target table will be loaded with data from "table_latest" on a regular basis

2. Your target table must also contain data from "table_old"

The steps:

1. Create a dlt pipeline as above

2. Change the target table to a different table name, like "table_target"

3. Run a one-time data-backfill from table_old as described in the docs.

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group