Hi!
I have an problem with incrementally processing data within my Delta Live Tables (DLT) pipeline. I have a raw file (in Delta format) where new data is added each day. When I run my DLT pipeline I only want the new data to be processed. As an example I made a pipeline with two notebooks, the first contains the following code
import dlt
@dlt.table()
def VEHICLE_SALES_INVOICE_TRX():
df = spark.sql(f"""
WITH AllCompanies as (
SELECT
TO_DATE( CAST( VI.Invoice_Date_KEY AS string ),'yyyyMMdd' ) AS Invoice_Date_KEY,
VI.Invoice_Number AS Invoice_Number,
VI.Invoice_sequence AS Invoice_sequence,
VI.Deliver_To_Customer_KEY AS Deliver_To_Customer_KEY,
VI.Customer_KEY AS Customer_KEY,
VI.SalesPerson_User_KEY AS SalesPerson_User_KEY,
VI.Stock_Vehicle_KEY AS Stock_Vehicle_KEY
VI.SalesAmount
FROM
MAIN.RAW_DATA.EXTR_VEHICLE_INVOICE_part VI
WHERE
VI.Invoice_Sequence = 1
)
/*
more transformation...
*/
SELECT
Invoice_Date_KEY,
Invoice_Number,
Invoice_sequence,
Deliver_To_Customer_KEY,
Customer_KEY,
SalesPerson_User_KEY,
Stock_Vehicle_KEY,
SalesAmount
FROM
base b
""")
return df
Then I have a second notebook, in which I do a SELECT * from the table above, as a dlt.table().
I ran the pipeline as a full refresh, then appended some new rows to the
MAIN.RAW_DATA.EXTR_VEHICLE_INVOICE_part table, and ran an update on the pipeline. But then all the rows were still processed, not just the new rows.
I also tried to define the raw table as a streaming table, but then I ran into an error when I appended rows the Delta table and tried to run an update, the message was
"
An error occured because we detected an update or delete to one or more rows in the source table. Streaming tables
may only use append-only streaming sources. If you expect to delete or update rows to the source table in the future, please convert your table
to a materialized view instead.
"
I tested the pipeline both in Pro and Advanced product edition, and the Databricks runtime is: 13.3 LTS (includes Apache Spark 3.4.1, Scala 2.12).
Does anyone have any insight in what I do wrong, since it is my understanding that Delta Live Tables should be able to handle this sort of incremental processing,
Kind regards,
Andreas