cancel
Showing results for 
Search instead for 
Did you mean: 
Community Platform Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

Issues with incremental data processing within Delta Live Tables

AndreasB2
New Contributor

Hi!

I have an problem with incrementally processing data within my Delta Live Tables (DLT) pipeline. I have a raw file (in Delta format) where new data is added each day. When I run my DLT pipeline I only want the new data to be processed. As an example I made a pipeline with two notebooks, the first contains the following code

 

 

import dlt

@dlt.table()
def VEHICLE_SALES_INVOICE_TRX():

  df = spark.sql(f"""
                                
    WITH AllCompanies as (
      SELECT
        TO_DATE( CAST( VI.Invoice_Date_KEY AS  string ),'yyyyMMdd' )                                AS Invoice_Date_KEY,
        VI.Invoice_Number                                                                           AS Invoice_Number,
        VI.Invoice_sequence                                                                         AS Invoice_sequence,	
        VI.Deliver_To_Customer_KEY                                                                  AS Deliver_To_Customer_KEY,
        VI.Customer_KEY                                                                             AS Customer_KEY,
        VI.SalesPerson_User_KEY                                                                     AS SalesPerson_User_KEY,
        VI.Stock_Vehicle_KEY                                                                        AS Stock_Vehicle_KEY
        VI.SalesAmount
      FROM
       MAIN.RAW_DATA.EXTR_VEHICLE_INVOICE_part VI
      WHERE 
       VI.Invoice_Sequence = 1
    )

    /*
    more transformation...
    */

    SELECT
      Invoice_Date_KEY,
      Invoice_Number,
      Invoice_sequence,	
      Deliver_To_Customer_KEY,
      Customer_KEY,
      SalesPerson_User_KEY,
      Stock_Vehicle_KEY,
      SalesAmount
    FROM
      base b

  """)
  return df

 

Then I have a second notebook, in which I do a SELECT * from the table above, as a dlt.table().

I ran the pipeline as a full refresh, then appended some new rows to the 

MAIN.RAW_DATA.EXTR_VEHICLE_INVOICE_part table, and ran an update on the pipeline. But then all the rows were still processed, not just the new rows.
 
I also tried to define the raw table as a streaming table, but then I ran into an error when I appended rows the Delta table and tried to run an update, the message was
 
"
An error occured because we detected an update or delete to one or more rows in the source table. Streaming tables
may only use append-only streaming sources. If you expect to delete or update rows to the source table in the future, please convert your table
to a materialized view instead.
"
 
I tested the pipeline both in Pro and Advanced product edition, and the Databricks runtime is: 13.3 LTS (includes Apache Spark 3.4.1, Scala 2.12).
 
Does anyone have any insight in what I do wrong, since it is my understanding that Delta Live Tables should be able to handle this sort of incremental processing,
 
Kind regards,
Andreas
 
0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group