Hi โ great question! This is a common pattern when you have a large medallion architecture with many bronze-to-silver dependencies. There are several approaches you can take, ranging from simple to more advanced.
โโโ
Option 1: Single DLT Pipeline with Declarative Dependencies (Recommended)
The simplest and most elegant approach is to define both your bronze and silver layers in the same DLT pipeline (or use multiple pipelines with shared datasets). DLT is inherently declarative โ if you define your silver tables as reading from bronze tables, DLT automatically handles the dependency graph and only processes what needs to be updated.
import dlt
from pyspark.sql.functions import *
# Bronze layer
@Dlt.table(name="bronze_t1")
def bronze_t1():
return spark.readStream.format("cloudFiles").load("/data/t1/")
@Dlt.table(name="bronze_t2")
def bronze_t2():
return spark.readStream.format("cloudFiles").load("/data/t2/")
# Silver layer โ DLT knows this depends on bronze_t1 and bronze_t2
@Dlt.table(name="silver_flow_1")
def silver_flow_1():
t1 = dlt.read_stream("bronze_t1")
t2 = dlt.read_stream("bronze_t2")
return t1.join(t2, "key_col").filter(col("status") == "active")
Why this works: When you trigger the pipeline, DLT resolves the DAG. If bronzet1 has new data, silverflow1 will process it. If bronzet5 has no new data, any silver table depending only on bronze_t5 won't do unnecessary work โ streaming tables will simply have no new records to process.
Tip: Use Triggered mode (pipelines.trigger.interval = once) or Continuous mode depending on your latency needs.
โโโ
Option 2: Separate Silver Pipelines Triggered via Databricks Workflows
If you need separate DLT pipelines per silver flow (for isolation, independent scheduling, or team ownership), you can orchestrate them using Databricks Workflows with dependencies:
Step 1 โ Create a Workflow where:
- Task 1: Bronze DLT pipeline (ingests all 100 tables)
- Task 2a: Silver Flow 1 DLT pipeline (depends on Task 1)
- Task 2b: Silver Flow 2 DLT pipeline (depends on Task 1)
- ...and so on
This ensures silver pipelines run after bronze completes. However, all silver pipelines will run every time.
Step 2 โ To make it selective (only trigger silver flows whose source bronze tables changed), add a lightweight check:
# In a notebook task that runs before each silver pipeline
def check_bronze_tables_changed(bronze_tables, since_timestamp):
"""Check if any of the specified bronze tables have new data since last run."""
for table in bronze_tables:
history = spark.sql(f"DESCRIBE HISTORY {table} LIMIT 5")
latest_version = history.select("timestamp").first()
if latest_version and str(latest_version["timestamp"]) > since_timestamp:
return True
return False
# Example: Silver Flow 1 depends on t1, t2, t3
tables_changed = check_bronze_tables_changed(
["catalog.bronze.t1", "catalog.bronze.t2", "catalog.bronze.t3"],
last_run_timestamp # Track this in a control table or widget
)
if not tables_changed:
dbutils.notebook.exit("SKIP - no upstream changes")
Then use the If/else condition task in Workflows (or use dbutils.notebook.exit() return values) to conditionally run or skip the downstream silver DLT pipeline.
โโโ
Option 3: Event-Driven with Delta Change Data Feed + Lakeflow Jobs
For a truly event-driven architecture:
Step 1 โ Enable Change Data Feed (CDF) on your bronze tables:
ALTER TABLE catalog.bronze.t1
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
Step 2 โ Use a Lakeflow/Workflow trigger โ configure a File Arrival trigger or use Databricks Asset Bundles to set up event-based triggers. When new files arrive in bronze source locations, only the relevant pipeline fires.
Step 3 โ Or use a dispatcher notebook that reads the change data from bronze tables, maintains a mapping of bronzetable to silverpipeline, and triggers only the relevant silver pipelines via the Jobs API:
import requests
# Mapping: which silver pipeline depends on which bronze tables
DEPENDENCY_MAP = {
"silver_pipeline_1_job_id": [
"catalog.bronze.t1", "catalog.bronze.t2", "catalog.bronze.t3"
],
"silver_pipeline_2_job_id": [
"catalog.bronze.t5", "catalog.bronze.t11", "catalog.bronze.t13"
],
}
def get_changed_tables(since_version):
"""Identify which bronze tables have changed."""
changed = set()
for table in ALL_BRONZE_TABLES:
try:
changes = (spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", since_version)
.table(table))
if changes.count() > 0:
changed.add(table)
except Exception:
pass
return changed
changed_tables = get_changed_tables(last_processed_version)
# Trigger only relevant silver pipelines
for job_id, dependencies in DEPENDENCY_MAP.items():
if any(t in changed_tables for t in dependencies):
response = requests.post(
f"{host}/api/2.1/jobs/run-now",
headers={"Authorization": f"Bearer {token}"},
json={"job_id": int(job_id)}
)
print(f"Triggered job {job_id}: {response.status_code}")
โโโ
Summary and Recommendation
|
Approach
|
Complexity
|
Best For
|
|
Option 1: Single DLT Pipeline
|
Low
|
When all bronze+silver can live in one pipeline. DLT handles the DAG natively.
|
|
Option 2: Workflows + Conditional Tasks
|
Medium
|
When silver pipelines must be separate but you want a simple orchestration layer.
|
|
Option 3: Event-Driven (CDF + Dispatcher)
|
Higher
|
When you need true event-driven, minimal-compute triggering at scale.
|
My recommendation: Start with Option 1 if possible. DLT's declarative model is built exactly for this use case โ you define what each silver table reads from, and DLT figures out when and what to process. If you need pipeline isolation for operational reasons, go with Option 2 using DESCRIBE HISTORY checks, which is straightforward to implement and maintain.
Hope this helps! Let me know if you have questions about any of these approaches.
Anuj Lathi
Solutions Engineer @ Databricks