cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Found issue with DLT for each batch Sink.

prafulja
New Contributor II

We are creating a Bronze table on top of ADLS data using Auto Loader with DLT. After that, we create the Silver table using a for-each-batch sink. Finally, we create the Gold table through a DLT materialized view.

However, when creating the Gold table through DLT, we get an error that the Silver table is not found. When we create the Silver table using the normal DLT mode (without the for-each-batch sink), everything works fine.

It seems DLT is unable to recognize whether the sink table has been created. 

Could someone please help me here.  

1 ACCEPTED SOLUTION

Accepted Solutions

SteveOstrowski
Databricks Employee
Databricks Employee

Hi @prafulja,

This is expected behavior based on how sinks work in Lakeflow Spark Declarative Pipelines (SDP), which is the current name for what was previously called DLT. The key point: sinks (including for_each_batch sinks) cannot be referenced by other datasets within the same pipeline. The documentation states that sink output cannot be consumed by downstream pipeline components.

Here is what is happening in your case:

1. Your Bronze table is created via Auto Loader as a standard SDP streaming table, so it is fully tracked in the pipeline's dependency graph.

2. Your Silver table is created using a for_each_batch sink. The for_each_batch sink writes data using custom Python logic outside of the pipeline's dependency tracking. SDP does not manage or track tables written by for_each_batch. As far as the pipeline is concerned, that sink is a "fire and forget" output.

3. Your Gold materialized view tries to reference the Silver table, but because the Silver table was written by a for_each_batch sink (not defined as a pipeline dataset), SDP cannot resolve it as a dependency, and you get the "table not found" error.

When you switch to creating the Silver table as a normal SDP streaming table or materialized view (without for_each_batch), it works because the table is registered in the pipeline's dependency graph.


RECOMMENDED APPROACHES

Option 1: Use a standard SDP streaming table for Silver (recommended)

If your Silver layer logic can be expressed as a streaming table definition, this is the cleanest approach. It keeps the full Bronze -> Silver -> Gold lineage within the pipeline:

from pyspark import pipelines as dp

@DP.table()
def silver_table():
return (
spark.readStream
.table("LIVE.bronze_table")
.select(...) # your transformations
)

@DP.table()
def gold_table():
return (
spark.read.table("LIVE.silver_table")
.groupBy(...)
.agg(...)
)


Option 2: Split into two pipelines

If you truly need for_each_batch for your Silver layer (for example, for MERGE operations or writing to multiple targets), you can split this into two separate pipelines:

Pipeline 1: Bronze -> Silver (using for_each_batch sink that writes to a Unity Catalog table)
Pipeline 2: Gold materialized view that reads from the Silver UC table

In the second pipeline, reference the Silver table by its full Unity Catalog name (catalog.schema.table) rather than using LIVE:

from pyspark import pipelines as dp

@DP.table()
def gold_table():
return (
spark.read.table("my_catalog.my_schema.silver_table")
.groupBy(...)
.agg(...)
)

You can orchestrate these two pipelines sequentially using a Databricks Workflow job so that Pipeline 2 runs after Pipeline 1 completes.


Option 3: Use for_each_batch for Silver, reference the UC table directly in the same pipeline

If you want to keep everything in one pipeline, you can have the for_each_batch sink write to a Unity Catalog table, and then have the Gold materialized view read from that UC table by its fully qualified name (not via LIVE). Note that SDP will not track this as a dependency, so there is no guarantee that the Silver table write completes before the Gold table reads. This approach is less reliable than Option 2 and is generally not recommended.


ADDITIONAL NOTES ON FOR_EACH_BATCH

- The pipeline cannot track or clean up data written by for_each_batch custom code. You are responsible for downstream data management.
- On a full refresh, checkpoints reset and you must manually manage cleanup of external tables written by for_each_batch.
- If you need idempotent writes, use the txnVersion and txnAppId options when writing from for_each_batch.

Documentation references:
https://docs.databricks.com/aws/en/ldp/sinks.html
https://docs.databricks.com/aws/en/ldp/for-each-batch
https://docs.databricks.com/aws/en/ldp/ldp-sinks

* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.

View solution in original post

3 REPLIES 3

MoJaMa
Databricks Employee
Databricks Employee

Could you please share the code you are using for this? Feel free to sanitize what's needed. Mostly I'd like to see how you create the silver table and how you refer to it in the gold layer. Please share the success and failure scenario code.

SteveOstrowski
Databricks Employee
Databricks Employee

Hi @prafulja,

This is expected behavior based on how sinks work in Lakeflow Spark Declarative Pipelines (SDP), which is the current name for what was previously called DLT. The key point: sinks (including for_each_batch sinks) cannot be referenced by other datasets within the same pipeline. The documentation states that sink output cannot be consumed by downstream pipeline components.

Here is what is happening in your case:

1. Your Bronze table is created via Auto Loader as a standard SDP streaming table, so it is fully tracked in the pipeline's dependency graph.

2. Your Silver table is created using a for_each_batch sink. The for_each_batch sink writes data using custom Python logic outside of the pipeline's dependency tracking. SDP does not manage or track tables written by for_each_batch. As far as the pipeline is concerned, that sink is a "fire and forget" output.

3. Your Gold materialized view tries to reference the Silver table, but because the Silver table was written by a for_each_batch sink (not defined as a pipeline dataset), SDP cannot resolve it as a dependency, and you get the "table not found" error.

When you switch to creating the Silver table as a normal SDP streaming table or materialized view (without for_each_batch), it works because the table is registered in the pipeline's dependency graph.


RECOMMENDED APPROACHES

Option 1: Use a standard SDP streaming table for Silver (recommended)

If your Silver layer logic can be expressed as a streaming table definition, this is the cleanest approach. It keeps the full Bronze -> Silver -> Gold lineage within the pipeline:

from pyspark import pipelines as dp

@DP.table()
def silver_table():
return (
spark.readStream
.table("LIVE.bronze_table")
.select(...) # your transformations
)

@DP.table()
def gold_table():
return (
spark.read.table("LIVE.silver_table")
.groupBy(...)
.agg(...)
)


Option 2: Split into two pipelines

If you truly need for_each_batch for your Silver layer (for example, for MERGE operations or writing to multiple targets), you can split this into two separate pipelines:

Pipeline 1: Bronze -> Silver (using for_each_batch sink that writes to a Unity Catalog table)
Pipeline 2: Gold materialized view that reads from the Silver UC table

In the second pipeline, reference the Silver table by its full Unity Catalog name (catalog.schema.table) rather than using LIVE:

from pyspark import pipelines as dp

@DP.table()
def gold_table():
return (
spark.read.table("my_catalog.my_schema.silver_table")
.groupBy(...)
.agg(...)
)

You can orchestrate these two pipelines sequentially using a Databricks Workflow job so that Pipeline 2 runs after Pipeline 1 completes.


Option 3: Use for_each_batch for Silver, reference the UC table directly in the same pipeline

If you want to keep everything in one pipeline, you can have the for_each_batch sink write to a Unity Catalog table, and then have the Gold materialized view read from that UC table by its fully qualified name (not via LIVE). Note that SDP will not track this as a dependency, so there is no guarantee that the Silver table write completes before the Gold table reads. This approach is less reliable than Option 2 and is generally not recommended.


ADDITIONAL NOTES ON FOR_EACH_BATCH

- The pipeline cannot track or clean up data written by for_each_batch custom code. You are responsible for downstream data management.
- On a full refresh, checkpoints reset and you must manually manage cleanup of external tables written by for_each_batch.
- If you need idempotent writes, use the txnVersion and txnAppId options when writing from for_each_batch.

Documentation references:
https://docs.databricks.com/aws/en/ldp/sinks.html
https://docs.databricks.com/aws/en/ldp/for-each-batch
https://docs.databricks.com/aws/en/ldp/ldp-sinks

* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.

prafulja
New Contributor II

Thank you for sharing the detailed explanation. I was following the same approach, but the challenge was that with foreachBatch, SDP wasnโ€™t able to reliably track whether the table had been created or not. When I tried Option 3 (without using the LIVE keyword in the Gold MV), it still failed to detect the Silver table.

As you mentioned, the only remaining option is to use two separate SDP pipelines instead of a single one. I have validated this approach, and it is working as expected.