We are trying to migrate to Delta Live Tables an Azure Data Factory pipeline which loads CSV files and outputs Delta Tables in Databricks.
The pipeline is triggered on demand via an external application which places the files in a Storage folder and then the pipeline runs and processes them. Files contain transactions, and our intention is that these transactions are deleted in the last layer and replaced by the new data per Company and Period fields.
We try in many ways to achieve that, but don't understand well function apply_changes. In all the examples and documentations it is assumed the old data is included along with the new one, which is not the case. We want to delete whatever is in this last layer and replace it by the data in the new arriving file but have come across different errors.
We thought let's encapsulate within a method reading the new data from stream and the last table in a data frame and combine both with something like the following. Flagging both datasets so that apply_changes knows what to delete from the gld layer:
def create_tmp_my_table():
@dlt.table(name="tmp_my_table",
temporary=True,
table_properties={"quality": "gold",
"delta.minReaderVersion" : "2",
"delta.minWriterVersion" : "5"})
def tmp_my_table():
# Assuming both are exactly same schema
df_old = dlt.read("slv_my_old_data").withColumn('DeleteFlag',lit(1))
df_stream = dlt.read_stream("slv_my_new_data").withColumn('DeleteFlag',lit(0))
df_final = df_stream.union(df_old)
return (df_final)
create_tmp_my_table()
dlt.create_target_table(name="gld_my_table",
table_properties={"quality": "gold",
"delta.minReaderVersion" : "2",
"delta.minWriterVersion" : "5"})
dlt.apply_changes(
target = "gld_my table",
source = "tmp_my table",
keys = ["Hash_Key"],
apply_as_deletes = expr("DeleteFlag = 1"), #DELETE condition
sequence_by = col("Load_Date")
)
Failed to start stream tmp_cash_flows in either append mode or complete mode. Append mode error: Union between streaming and batch DataFrames/Datasets is not supported
Then we thought ok, let's read only the old batch data and see if at least we're able to delete:
def create_tmp_my_table():
@dlt.table(name="tmp_my_table",
temporary=True,
table_properties={"quality": "gold",
"delta.minReaderVersion" : "2",
"delta.minWriterVersion" : "5"})
def tmp_my_table():
# That's the previous persisted table not the stream
df = dlt.read("slv_my_old_data").withColumn('DeleteFlag',lit(1))
return (df)
create_tmp_my_table()
dlt.create_target_table(name="gld_my_table",
table_properties={"quality": "gold",
"delta.minReaderVersion" : "2",
"delta.minWriterVersion" : "5"})
dlt.apply_changes(
target = "gld_my table",
source = "tmp_my table",
keys = ["Hash_Key"],
apply_as_deletes = expr("DeleteFlag = 1"), #DELETE condition
sequence_by = col("Load_Date")
)
But a different error arises:
Detected a data update (for example xxxxxxx.snappy.parquet) in the source table at version 25. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory
This comes added with more challenges which still are thinking about how to handle but will not expose here in order not to mix.
So how should rip and replace be properly handled in Delta Live Tables? Does anyone has a good example somewhere?