cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Newbie learning DLT pipelines

ShreevaniRao
New Contributor II

Hello,

I am learning to create DLT pipelines using different graphs using a  14 day trial version of the premium Databricks. I have currently one graph Mat view -> Streaming Table -> Mat view. When i ran this pipeline(serverless compute) 1st time, ran fine without any issues, but 2nd time i ran this pipeline without any change to the pipeline or any data asset(just restarted the pipeline), I was expecting it to break because "source of streaming table has to be append only" but it didnt break neither threw any errors. 

I wanted to know if this is unexpected behavior or is their any implied setting in the pipeline that I need to learn?

13 REPLIES 13

Brahmareddy
Honored Contributor III

Hi ShreevaniRao,

How are you doing today?, As per my understanding, it actually makes sense based on how DLT works. When you restarted the pipeline without making any changes or adding new data, DLT likely didn’t find anything new to process, so it quietly skipped over the steps that could’ve caused issues with non-append-only sources. That’s why you didn’t see any errors. This isn’t unexpected—it’s just that DLT is smart enough not to re-process data or throw errors unless something actually changes. However, if you were to modify the data or pipeline logic in a way that violates the append-only rule, it might still break in future runs. There’s no hidden setting you missed, but if you want to experiment more, try updating the data to see how DLT reacts. Let me know if you’d like help testing that out!

Regards,

Brahma

Thank You, Brahma for your response and explaination.

I am curious to try your suggestion by working with different data updates scenarios, to check how it behaves. Thanks for the offer to help with the testing, I will be in touch with you if I have further queries.

Have a great day!!

Regards.

Shreevani

Brahmareddy
Honored Contributor III

You're very welcome, Shreevani! I'm really glad the explanation helped, and it’s awesome that you're going to explore different update scenarios—that’s the best way to get hands-on insight into how everything behaves. Definitely feel free to reach out anytime if you run into anything unexpected or need help fine-tuning things. Wishing you all the best with your testing, and have a great day as well! 😊

Warm regards,
Brahma

Hello Brahma,

I wanted to check if what I am encountering as an error is really due to use of Serverless computing for my compute setup of DLT pipeline. I am trying to work with - Apply changes API code in a pipeline and seem to invariably run into couple of issues - 1. The serverless compute issue 2.The Quota exhaustion( for a dedicated Job compute) error.

The reason I am using serverless and not a dedicated Job compute is because i was constanting getting error msgs regarding exhausted quota for my region(I'm using 14 day trial premium), so i switched to serverless computing, but then I get below msg when I am starting my pipeline. My AI help says its because of severless compute does not support Apply changes API? I will appreciate your input in this regard.

Error Msg -

pyspark.errors.exceptions.base.PySparkAttributeError: Traceback (most recent call last):
File "/Delta Live Tables/star_pipeline", cell 7, line 16, in scd_customers
.apply_changes(
^^^^^^^^^^^^^

pyspark.errors.exceptions.base.PySparkAttributeError: [ATTRIBUTE_NOT_SUPPORTED] Attribute `apply_changes` is not supported.

My Gold layer code -

 

@dlt.table(
    name="gold_customers",
    comment="SCD Type 2 implementation for customers using apply_changes",
    table_properties={"quality": "gold"}
)
def scd_customers():
    return (
        dlt.read("customers_silver")  
        .apply_changes(
            target="gold_customers",
            keys=["customer_id"],
            sequence_by="updated_at",  
            stored_as_scd_type=2
        )
    )

Brahmareddy
Honored Contributor III

Hi Shreevani,

As per my understanding, it’s happening because serverless compute doesn’t currently support the apply_changes() API in DLT pipelines. That method is part of advanced SCD (Slowly Changing Dimension) features, which are only available on dedicated job compute, not serverless. Since you’re using a trial workspace and running into quota limits, it makes sense that you switched to serverless, but unfortunately, this feature isn’t supported there. A good workaround would be to either request a temporary quota increase from Azure so you can try a dedicated cluster, or rewrite your logic using standard MERGE statements inside a DLT function to simulate SCD behavior. It’s a bit more manual but works well when you're limited by serverless capabilities. Let me know if you’d like help adjusting the code—happy to guide you!

Regards,

Brahma

Thank you Brahma, for your response. I tried requesting quota increase - but not granted (for my subscription type). It is very frustrating and a hassle with this situation - I tried googling, and came across others in the same situation. Azure could do better may be!!

I will now have to use the routine way to showcase SCD behavior, i was trying use the latest to showcase in my portfolio project.

Thanks again!!

Hello Brahma,

Hope you are having a good day.

I took your suggestion to rewrite my logic to use MERGE to simulate SCD 2 in my DLT pipeline, and need your help. I am keeping it simple as far as what changes I am testing in my customers table. Which has a column email that I am trying to change and make sure my Dim Customers is tracking the history by updating the change and inserting a new record. But I am not able to get to see the new inserted record in my Dim Customers, though update of the new value of the email is reflected. Below is my code for the SCD 2. Please let me know if you need more details.

Thanks!

 

Code for SCD 2 -

from delta.tables import DeltaTable

@dlt.table(name="dim_customers", comment="Customer dimension with manual SCD Type 2")
def dim_customers():
    new_df = (dlt.read("customers_silver")
                .withColumn("start_date", col("updated_at"))  # Use source timestamp
                .withColumn("is_current", lit(True))
                .withColumn("end_date", lit(None).cast("timestamp"))
    )

    try:
        # Read the existing dimension table (dim_customers)
        delta_table = DeltaTable.forName("dim_customers")
    except Exception:
        # First run — no dim_customers yet
        return new_df.withColumn("is_current", lit(True)).withColumn("end_date", lit(None).cast("timestamp"))
   
    # Perform MERGE operation to implement SCD Type 2
    merge_condition = """
        existing.customer_id = new.customer_id
        AND existing.is_current = true
        AND (
            existing.email <> new.email
        )
    """

    # We want to close old records when there is a change in data (email)
    # Apply the MERGE statement
    delta_table.alias("existing").merge(
        new_df.alias("new"),
        merge_condition
    ).whenMatchedUpdate(
        set={
            "is_current": lit(False),
            "end_date": col("new.updated_at")
        }
    ).whenNotMatchedInsertAll().execute()

    return delta_table.toDF()

Brahmareddy
Honored Contributor III

Hi Shreevani,

How are you doing today?, As per my understanding, you're really close, and it's awesome that you’re testing this out using MERGE for SCD Type 2 in your DLT pipeline. The main issue here is that you’re performing the merge() operation inside the DLT table function, but DLT expects the function to return a DataFrame—not run an action like merge(). So while your existing records are getting updated (because merge() executes), the new records aren't being picked up in the returned result, which is why you don't see the inserted row in your dim_customers table. A better approach would be to move the merge logic outside the DLT table function, or if you want to keep everything inside one function, you can skip using merge() and instead manually filter changed records and use unionByName() to return both old and new versions. That keeps it all within DLT’s expectations and still gets the SCD 2 behavior you want. Let me know if you’d like help writing that version—I’d be happy to help clean it up with you!

Regards,

Brahma

Ah! Thank You Brahma! for pointing it out. I will try it out. 

Thanks for your guidance.

RiyazAli
Valued Contributor III

Hello @Brahmareddy & @ShreevaniRao - I wanted to clarify, if you run the pipeline again for the 2nd time, my understanding is that DLT will check the source table checkpoints to determine if there's any new data to be loaded, if not you would see '0' records processed in each table in the pipeline. I couldn't understand why would it break, could you clarify or provide a doc for my understanding.

Riz

Hello Riyaz,

Sorry, for late response. I was learning about DLT pipeline building thru online videos, and one of them showed breaking of the pipeline in 2nd run for Mat View -> Streaming Table -> Mat View, and it made sense because "source of streaming table has to be append only" - which it is not. I dont know but i was replicating and I couldnt break it :-). May be the runtimes were different? Hope this helps.

RiyazAli
Valued Contributor III

Thanks for your response @ShreevaniRao - I'm trying to solidify my understanding of DLT pipelines, hence the follow-up questions. As far as I can comprehend, DLT pipeline would break if the source table is changed, may be a schema change or some alterations after from appending the data. If you haven't made those alter changes, the DLT pipeline would simply check if any new data is present or now, if yes, the incremental data is processed, if not, 0 records in the dependent table are populated.

Cheers!

Riz

Aviral-Bhardwaj
Esteemed Contributor III
Video explains - What are Delta Live Tables in Databricks? What is DLT pipeline? What is Streaming Table in DLT Pipeline? What is Materialized View in DLT pipeline? How to create a DLT pipeline? What is LIVE keyword in DLT pipeline? Difference between DLT Streaming table and Materialized View? ...

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now