cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Delta Live Tables CDC doubts

Llop
New Contributor II

We are trying to migrate to Delta Live Tables an Azure Data Factory pipeline which loads CSV files and outputs Delta Tables in Databricks.

The pipeline is triggered on demand via an external application which places the files in a Storage folder and then the pipeline runs and processes them. Files contain transactions, and our intention is that these transactions are deleted in the last layer and replaced by the new data per Company and Period fields.

We try in many ways to achieve that, but don't understand well function apply_changes. In all the examples and documentations it is assumed the old data is included along with the new one, which is not the case. We want to delete whatever is in this last layer and replace it by the data in the new arriving file but have come across different errors.

We thought let's encapsulate within a method reading the new data from stream and the last table in a data frame and combine both with something like the following. Flagging both datasets so that apply_changes knows what to delete from the gld layer:

def create_tmp_my_table():    
    @dlt.table(name="tmp_my_table",
               temporary=True,
               table_properties={"quality": "gold",
                                 "delta.minReaderVersion" : "2",
                                 "delta.minWriterVersion" : "5"})
 
def tmp_my_table():
    # Assuming both are exactly same schema
    df_old = dlt.read("slv_my_old_data").withColumn('DeleteFlag',lit(1))
    df_stream = dlt.read_stream("slv_my_new_data").withColumn('DeleteFlag',lit(0))
    df_final = df_stream.union(df_old)
    return (df_final)
 
create_tmp_my_table()
 
dlt.create_target_table(name="gld_my_table",
                        table_properties={"quality": "gold",
                                          "delta.minReaderVersion" : "2",
                                          "delta.minWriterVersion" : "5"})
 
dlt.apply_changes(
  target = "gld_my table",
  source = "tmp_my table",
  keys = ["Hash_Key"],
  apply_as_deletes = expr("DeleteFlag = 1"), #DELETE condition
  sequence_by = col("Load_Date")
)

Failed to start stream tmp_cash_flows in either append mode or complete mode. Append mode error: Union between streaming and batch DataFrames/Datasets is not supported

Then we thought ok, let's read only the old batch data and see if at least we're able to delete:

def create_tmp_my_table():    
    @dlt.table(name="tmp_my_table",
               temporary=True,
               table_properties={"quality": "gold",
                                 "delta.minReaderVersion" : "2",
                                 "delta.minWriterVersion" : "5"})
 
def tmp_my_table():
     # That's the previous persisted table not the stream
    df = dlt.read("slv_my_old_data").withColumn('DeleteFlag',lit(1))
    return (df)
 
create_tmp_my_table()
 
dlt.create_target_table(name="gld_my_table",
                        table_properties={"quality": "gold",
                                          "delta.minReaderVersion" : "2",
                                          "delta.minWriterVersion" : "5"})
 
dlt.apply_changes(
  target = "gld_my table",
  source = "tmp_my table",
  keys = ["Hash_Key"],
  apply_as_deletes = expr("DeleteFlag = 1"), #DELETE condition
  sequence_by = col("Load_Date")
)

But a different error arises:

Detected a data update (for example xxxxxxx.snappy.parquet) in the source table at version 25. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory

This comes added with more challenges which still are thinking about how to handle but will not expose here in order not to mix.

So how should rip and replace be properly handled in Delta Live Tables? Does anyone has a good example somewhere?

1 REPLY 1

Anonymous
Not applicable

@Enric Llop​ :

When using Delta Live Tables to perform a "rip and replace" operation, where you want to replace the existing data in a table with new data, there are a few things to keep in mind.

First, the apply_changes function is used to apply changes from a source table to a target table. The source table can be a Delta table or a streaming DataFrame, while the target table must be a Delta table. To perform a "rip and replace" operation, you can create a temporary table that contains only the new data, and then use apply_changes with the apply_as_delete option to delete the existing data in the target table before inserting the new data. Here's an example:

from delta.tables import *
from pyspark.sql.functions import lit
 
# Create a temporary table with only the new data
new_data = DeltaTable.forPath(spark, "path/to/new/data")
new_data.createOrReplaceTempView("tmp_new_data")
tmp_new_data = spark.sql("SELECT *, 0 as DeleteFlag FROM tmp_new_data")
 
# Create the target table if it doesn't exist
target_table = DeltaTable.forPath(spark, "path/to/target/table")
target_table.createOrReplaceTempView("tmp_target_table")
 
# Apply the changes to the target table, deleting the existing data
# and inserting the new data
target_table.alias("target").merge(
    tmp_new_data.alias("source"),
    "target.Hash_Key = source.Hash_Key"
).whenMatchedDelete(condition="source.DeleteFlag = 1").whenNotMatchedInsertAll().execute()

In this example, we create a temporary table tmp_new_data that contains only the new data, with a DeleteFlag column set to 0. We also create a temporary view tmp_target_table that refers to the target table. We then use the merge function to apply changes from tmp_new_data to tmp_target_table. The whenMatchedDelete option deletes any rows from the target table where the DeleteFlag is 1, effectively removing the existing data. The whenNotMatchedInsertAll option inserts all rows from tmp_new_data that don't match any rows in the target table.

Note that this example assumes that the Hash_Key column is used as the primary key for the table. You may need to adjust the join condition in the merge function to match the columns in your table. Also note that the merge function is a Delta-specific operation and may not work with other types of tables.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.