cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Delta table update

databrciks
New Contributor III
Hi Experts
 
I have around 100 table in the bronze layer (DLT pipeline). We have created silver layer based on some logic around 20 silver layer tables.
How to run the specific pipeline in silver layer when ever there is some update happens in the bronze layer.
 
Say for ex if i am using t1,t2,t3 then trigger silver pipeline using these tables.
           if i am using t5,t11,t13 then trigger silver pipeline using these tables.
   
So there could be 50 table updates in bronze layer and i need to trigger only those flows in silver layer where 50 tables being used(could be 10 flows in the silver layer)
 
how to design this flow . Please advise.
1 ACCEPTED SOLUTION

Accepted Solutions

databrciks
New Contributor III

Thanks @anuj_lathi  for the Detailed explanation. This helps a lot .

View solution in original post

2 REPLIES 2

anuj_lathi
Databricks Employee
Databricks Employee

Hi โ€” great question! This is a common pattern when you have a large medallion architecture with many bronze-to-silver dependencies. There are several approaches you can take, ranging from simple to more advanced.

โ€”โ€”โ€”

Option 1: Single DLT Pipeline with Declarative Dependencies (Recommended)

The simplest and most elegant approach is to define both your bronze and silver layers in the same DLT pipeline (or use multiple pipelines with shared datasets). DLT is inherently declarative โ€” if you define your silver tables as reading from bronze tables, DLT automatically handles the dependency graph and only processes what needs to be updated.

import dlt

from pyspark.sql.functions import *

 

# Bronze layer

@Dlt.table(name="bronze_t1")

def bronze_t1():

    return spark.readStream.format("cloudFiles").load("/data/t1/")

 

@Dlt.table(name="bronze_t2")

def bronze_t2():

    return spark.readStream.format("cloudFiles").load("/data/t2/")

 

# Silver layer โ€” DLT knows this depends on bronze_t1 and bronze_t2

@Dlt.table(name="silver_flow_1")

def silver_flow_1():

    t1 = dlt.read_stream("bronze_t1")

    t2 = dlt.read_stream("bronze_t2")

    return t1.join(t2, "key_col").filter(col("status") == "active")

 

Why this works: When you trigger the pipeline, DLT resolves the DAG. If bronzet1 has new data, silverflow1 will process it. If bronzet5 has no new data, any silver table depending only on bronze_t5 won't do unnecessary work โ€” streaming tables will simply have no new records to process.

Tip: Use Triggered mode (pipelines.trigger.interval = once) or Continuous mode depending on your latency needs.

โ€”โ€”โ€”

Option 2: Separate Silver Pipelines Triggered via Databricks Workflows

If you need separate DLT pipelines per silver flow (for isolation, independent scheduling, or team ownership), you can orchestrate them using Databricks Workflows with dependencies:

Step 1 โ€” Create a Workflow where:

  • Task 1: Bronze DLT pipeline (ingests all 100 tables)
  • Task 2a: Silver Flow 1 DLT pipeline (depends on Task 1)
  • Task 2b: Silver Flow 2 DLT pipeline (depends on Task 1)
  • ...and so on

This ensures silver pipelines run after bronze completes. However, all silver pipelines will run every time.

Step 2 โ€” To make it selective (only trigger silver flows whose source bronze tables changed), add a lightweight check:

# In a notebook task that runs before each silver pipeline

 

def check_bronze_tables_changed(bronze_tables, since_timestamp):

    """Check if any of the specified bronze tables have new data since last run."""

    for table in bronze_tables:

        history = spark.sql(f"DESCRIBE HISTORY {table} LIMIT 5")

        latest_version = history.select("timestamp").first()

        if latest_version and str(latest_version["timestamp"]) > since_timestamp:

            return True

    return False

 

# Example: Silver Flow 1 depends on t1, t2, t3

tables_changed = check_bronze_tables_changed(

    ["catalog.bronze.t1", "catalog.bronze.t2", "catalog.bronze.t3"],

    last_run_timestamp  # Track this in a control table or widget

)

 

if not tables_changed:

    dbutils.notebook.exit("SKIP - no upstream changes")

 

Then use the If/else condition task in Workflows (or use dbutils.notebook.exit() return values) to conditionally run or skip the downstream silver DLT pipeline.

โ€”โ€”โ€”

Option 3: Event-Driven with Delta Change Data Feed + Lakeflow Jobs

For a truly event-driven architecture:

Step 1 โ€” Enable Change Data Feed (CDF) on your bronze tables:

ALTER TABLE catalog.bronze.t1

SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

 

Step 2 โ€” Use a Lakeflow/Workflow trigger โ€” configure a File Arrival trigger or use Databricks Asset Bundles to set up event-based triggers. When new files arrive in bronze source locations, only the relevant pipeline fires.

Step 3 โ€” Or use a dispatcher notebook that reads the change data from bronze tables, maintains a mapping of bronzetable to silverpipeline, and triggers only the relevant silver pipelines via the Jobs API:

import requests

 

# Mapping: which silver pipeline depends on which bronze tables

DEPENDENCY_MAP = {

    "silver_pipeline_1_job_id": [

        "catalog.bronze.t1", "catalog.bronze.t2", "catalog.bronze.t3"

    ],

    "silver_pipeline_2_job_id": [

        "catalog.bronze.t5", "catalog.bronze.t11", "catalog.bronze.t13"

    ],

}

 

def get_changed_tables(since_version):

    """Identify which bronze tables have changed."""

    changed = set()

    for table in ALL_BRONZE_TABLES:

        try:

            changes = (spark.read.format("delta")

                .option("readChangeFeed", "true")

                .option("startingVersion", since_version)

                .table(table))

            if changes.count() > 0:

                changed.add(table)

        except Exception:

            pass

    return changed

 

changed_tables = get_changed_tables(last_processed_version)

 

# Trigger only relevant silver pipelines

for job_id, dependencies in DEPENDENCY_MAP.items():

    if any(t in changed_tables for t in dependencies):

        response = requests.post(

            f"{host}/api/2.1/jobs/run-now",

            headers={"Authorization": f"Bearer {token}"},

            json={"job_id": int(job_id)}

        )

        print(f"Triggered job {job_id}: {response.status_code}")

 

โ€”โ€”โ€”

Summary and Recommendation

 

Approach

Complexity

Best For

Option 1: Single DLT Pipeline

Low

When all bronze+silver can live in one pipeline. DLT handles the DAG natively.

Option 2: Workflows + Conditional Tasks

Medium

When silver pipelines must be separate but you want a simple orchestration layer.

Option 3: Event-Driven (CDF + Dispatcher)

Higher

When you need true event-driven, minimal-compute triggering at scale.

My recommendation: Start with Option 1 if possible. DLT's declarative model is built exactly for this use case โ€” you define what each silver table reads from, and DLT figures out when and what to process. If you need pipeline isolation for operational reasons, go with Option 2 using DESCRIBE HISTORY checks, which is straightforward to implement and maintain.

Hope this helps! Let me know if you have questions about any of these approaches.

Anuj Lathi
Solutions Engineer @ Databricks

databrciks
New Contributor III

Thanks @anuj_lathi  for the Detailed explanation. This helps a lot .