DLT table reading not performing file pruning on partition column

gkapri
New Contributor II

I have created bronze table and partitioned on processing date which is date column. In silver table i am putting filter on basis of processing date column to read last 2 days data but it is reading 37 million data but i have only 24722 in last 2 days partitions. Below is screenshot of query profile in Delta live table

gkapri_0-1770221522007.png

Below is function to filter last 2 days data

def filter_latest_records(df,timing_column = 'processing_date'😞
    df = df.filter(col(timing_column) >= date_sub(current_date(), 1))
    return df
 
Can some help me why it is reading 37 million data?

saurabh18cs
Honored Contributor III

Hi  becasue you need a literal not dynamic expression, with dynanic expression all partitions are already scanned. so btr to use literal. try this:

def filter_latest_records(df, timing_column="processing_date"):
    yesterday = spark.sql("select date_sub(current_date(), 1) as d").first()["d"]
    return df.filter(col(timing_column) >= yesterday)

@gkapri

Amit_Dass_Chmp
New Contributor III

@gkapri Hi, If you use current_date() directly in the filter, Spark often can’t prune partitions;

What you can do here, if you first store current_date() in any  variable (as a value), it usually can prune. 

Also, do try to run EXPLAIN

so, when you run EXPLAIN, you should see PartitionFilters like

PartitionFilters: [isnotnull(processing_date#...), (processing_date#... >= 2026-02-05)]

Hope this helps!

gkapri
New Contributor II

I tried by defining variable and then performing filter over it and it worked in databricks notebook as shown in below snippet

gkapri_0-1770464980158.png

But when i tried same using delta live tables, it is not pruning data as you can see in below snippet

gkapri_1-1770465048087.png

Not sure what is happening

 

 

gkapri
New Contributor II

Also one more thing, i am using readstream in pipeline to read bronze table and trying to do partition pruning but it is not happening. Can someone suggest to achieve partition pruning in streaming table

saurabh18cs
Honored Contributor III

Hi @gkapri  can you do this for DLT pipelines? for DLT we need a ccomplile time SQL literal not runtime literal 🙂

def filter_latest_records(df, timing_column="processing_date"):
    return df.filter(
        col(timing_column) >= expr("date_sub(current_date(), 1)")
    )

@gkapri

Poorva21
Contributor II

Explanation

Static partition pruning does not work in streaming reads (readStream) by design.

In streaming, Spark:

Can prune files/partitions to some extent

Cannot prune rows inside files

Must scan entire files of selected partitions

Filters like processing_date >= current_date() - n are applied after data is read, not at scan time.

In DLT, current_date() is non-deterministic, which further limits pruning.

What query profile confirms

Partitions read < total partitions → file pruning happened

Rows read still very high → full file scan

Rows returned = 0 → filter applied post-scan

Key takeaway

Partition pruning in Delta streaming works only at file selection level, not row level.

Large historical files inside partitions will still be fully scanned.

Recommended approaches

Use batch read for Bronze → Silver if partition pruning is required

Use Auto Loader to read only new files

Avoid using readStream when selective historical reads are needed

Anish_2
New Contributor III

I am using autocdc in silver and autocdc in silver requires streaming source. I tried using batch read but it failed. If you have code handy of batch read of bronze and do autocdc in silver,it would be very helpful @Poorva21 

Anish_2
New Contributor III

Basically here ask is to read bronze table new files only using autoloader then do some basic transformation like renaming of columns then do autocdc in silver. But when we do readsteam on bronze, it scan whole table instead of partition pruning. Can u pls give idea how can we achieve this ?

SteveOstrowski
Databricks Employee
Databricks Employee

Hi @gkapri,

Thanks for the detailed writeup. I can see from the thread that you have already tried the literal variable approach that works in a notebook but does not work inside a Lakeflow Spark Declarative Pipeline (SDP). Let me explain what is happening and how to address it.


WHY PARTITION PRUNING DOES NOT WORK WITH readStream IN SDP

The root cause is how Structured Streaming reads Delta tables versus how batch reads work. These are fundamentally different processing models:

1. Batch reads (spark.read) evaluate the full query plan at execution time, including partition filters. When you filter on a partition column with a literal value, Spark can push that filter down into the scan and skip entire partition directories. This is why your notebook test worked.

2. Streaming reads (spark.readStream) process the Delta transaction log incrementally. The stream tracks which files have already been processed using a checkpoint, and on each micro-batch it picks up NEW files from the log. The key point is that readStream does not apply your downstream filters at the file-selection stage. Instead, it reads all new files from the transaction log first, and then your filter is applied after the data is already read into memory. This is by design, because the streaming engine needs to maintain exactly-once processing guarantees by tracking every file.

3. Non-deterministic functions in SDP add another layer. Inside a Lakeflow Spark Declarative Pipeline (SDP), expressions like current_date() are evaluated at pipeline plan time. Even converting to a literal variable with spark.sql() may not resolve the issue because the SDP execution model handles variable evaluation differently than an interactive notebook.

So to summarize: the 37 million rows you see being read is the stream reading all new files from the bronze table's transaction log, and then your processing_date filter is applied after the fact, resulting in only 24,722 rows being kept.


WHAT ABOUT AUTO CDC (APPLY CHANGES)?

I noticed from the follow-up comments that you and @Anish_2 are using AUTO CDC (formerly APPLY CHANGES) for the silver layer, which requires a streaming source. This means you cannot simply switch to a batch read (spark.read) to get partition pruning. This is a real constraint.

Here are your options:


OPTION 1: LET THE STREAM HANDLE INCREMENTALITY (RECOMMENDED)

Instead of filtering by processing_date to limit the data, rely on the streaming checkpoint to handle incrementality for you. This is actually the intended design pattern for streaming pipelines:

import dlt
from pyspark.sql.functions import col

@Dlt.table()
def silver_table():
return spark.readStream.table("LIVE.bronze_table")

The stream will only process NEW files that arrived since the last pipeline run. After the initial backfill (which will read all historical data once), subsequent runs will only pick up incremental data. This eliminates the need for the date filter entirely because the checkpoint ensures you never reprocess old data.

If your concern is the initial backfill reading too much data, you can control the rate with:

@Dlt.table()
def silver_table():
return (
spark.readStream
.option("maxFilesPerTrigger", 100)
.table("LIVE.bronze_table")
)

This throttles how many files are processed per micro-batch during the initial catch-up.


OPTION 2: USE skipChangeCommits WITH A TARGETED STREAM

If your bronze table receives updates or deletes and you only want appends, you can combine streaming with skipChangeCommits:

@Dlt.table()
def silver_table():
return (
spark.readStream
.option("skipChangeCommits", "true")
.table("LIVE.bronze_table")
)

This skips any commits that contain updates or deletes, processing only append commits.


OPTION 3: USE A MATERIALIZED VIEW FOR THE FILTERED LAYER

If you truly need partition pruning with a date filter, use a materialized view for that transformation step, since materialized views use batch reads:

CREATE OR REFRESH MATERIALIZED VIEW silver_filtered
AS
SELECT *
FROM LIVE.bronze_table
WHERE processing_date >= current_date() - INTERVAL 2 DAYS

Materialized views process data using batch reads, so partition pruning works as expected. The trade-off is that materialized views fully recompute on each refresh (though Databricks does apply incremental refresh optimizations where possible).

However, if you need to feed this into an AUTO CDC target, you would need an additional streaming table layer reading from this materialized view.


OPTION 4: USE startingTimestamp TO LIMIT INITIAL LOAD

If your main concern is the initial pipeline run reading too much historical data, you can set a starting point for the stream:

@Dlt.table()
def silver_table():
return (
spark.readStream
.option("startingTimestamp", "2026-02-01")
.table("LIVE.bronze_table")
)

This tells the stream to only process changes from that timestamp forward, skipping all earlier data. After the initial run, the checkpoint takes over and only new data is processed.


ADDITIONAL PERFORMANCE TIPS

1. Enable auto-compaction on your bronze table. If your bronze streaming table creates many small files, queries (and downstream streams) will be slower. Lakeflow Spark Declarative Pipelines (SDP) enables auto-optimization by default, but verify it is working by checking the file count:

DESCRIBE DETAIL your_catalog.your_schema.bronze_table

2. Consider liquid clustering instead of partitioning. For new tables, liquid clustering (CLUSTER BY) often outperforms traditional partitioning because it handles data skipping more flexibly:

CREATE OR REFRESH STREAMING TABLE bronze_table
CLUSTER BY (processing_date)
AS SELECT ...

3. Run OPTIMIZE periodically on the bronze table if you see many small files accumulating, or enable predictive optimization at the catalog or schema level.


KEY TAKEAWAY

The fundamental insight is that streaming reads and batch reads use different mechanisms for data access. Streaming reads track the transaction log incrementally and do not apply partition pruning on downstream filters. The recommended pattern for streaming pipelines is to rely on checkpointing for incrementality rather than date-based filters. Your date filter still works for correctness (it correctly filters the output), but it cannot reduce the amount of data read at the source scan level in a streaming context.


RELEVANT DOCUMENTATION

Delta Lake as a streaming source:
https://docs.databricks.com/en/structured-streaming/delta-lake.html

Lakeflow Spark Declarative Pipelines (SDP) streaming tables:
https://docs.databricks.com/en/ldp/concepts.html

AUTO CDC (APPLY CHANGES) reference:
https://docs.databricks.com/en/ldp/cdc.html

Data skipping and partition pruning:
https://docs.databricks.com/en/delta/data-skipping.html

Liquid clustering:
https://docs.databricks.com/en/delta/clustering.html

File size tuning and auto-compaction:
https://docs.databricks.com/en/delta/tune-file-size.html


Hope this helps you and @Anish_2! The short answer is that this is expected behavior for streaming reads, and the best approach is to let the streaming checkpoint handle incrementality rather than using date filters to limit the scan.

* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.

Hello @SteveOstrowski 

Thank you for detailed explanation for why partition pruning don't work in streaming queries. Currently I am using dlt.view instead of dlt.table. Will @dlt.view also works as same as dlt.table? As I want to avoid creation of duplicate bronze table. 

SteveOstrowski
Databricks Employee
Databricks Employee

Hi Anish,

Yes, @ dlt.view (or @ dp.temporary_view in the newer Lakeflow Declarative Pipelines syntax) will work for your use case. The key difference is that a view does not materialize/persist data as a physical Delta table -- it acts as a temporary, virtual definition within the pipeline. So if your goal is to avoid creating a duplicate bronze table, using @ dlt.view instead of @ dlt.table is the right approach. The view defines the transformation logic without writing a separate copy of the data.

One thing to keep in mind: the partition pruning limitation I described earlier applies at the streaming read level, not at the materialization level. Streaming reads track the Delta transaction log incrementally and do not push down partition filters. That behavior is the same regardless of whether the downstream dataset is defined as a @ dlt.view or @ dlt.table. So the view avoids the duplicate table, but the streaming source will still scan all partitions as new files arrive.

If you need partition-level filtering on the source, you would need a batch read (non-streaming) with explicit filter pushdown, but that changes the incremental processing semantics. For most bronze ingestion patterns, the streaming approach with a view is the right tradeoff.

Sources:

Hello @SteveOstrowski  currently there are 2 pipelines running in parallel. Both pipeline consists of bronze,view and silver table. 1st pipeline is having around 40+ tables and 2nd has 20. I am not able to debug why pipeline with less table is taking more time in initialising and loading of data. Could you please steps to debug why it is taking time

SteveOstrowski
Databricks Employee
Databricks Employee

Hi Anish,

There are several things that could explain why a pipeline with fewer tables takes longer to initialize and load. The number of tables alone does not determine pipeline performance — it depends on the complexity of each flow, data volumes, and how the pipeline is configured. Here are some concrete steps to debug this:

1. Check the event log for timing breakdowns

Query the pipeline event log for flow_progress events to see per-table duration_seconds. This will tell you exactly which tables in the slower pipeline are taking the most time during initialization and data loading. Also look at update_progress events to see how long the INITIALIZING and SETTING_UP_TABLES phases take.

2. Look at the INITIALIZING and SETTING_UP_TABLES phases

During INITIALIZING, the pipeline builds its logical plan and dependency graph — complex dependencies or CDC transformations add overhead here. During SETTING_UP_TABLES, schema validation and table state assessment happen for every defined table. If the 20-table pipeline has more complex transformations (e.g., apply_changes / SCD operations, many views, or heavier joins), it could take longer than the 40-table pipeline with simpler flows.

3. Check for driver bottlenecks

Monitor driver CPU and memory. Even with fewer tables, if the individual streaming flows are more resource-intensive (larger state, more complex logic), the driver can become a bottleneck.

4. Compare data volumes and file counts

Use the operation_progress events in the event log to check Auto Loader file listing counts and backlog bytes. The slower pipeline may be processing significantly more data per table, or dealing with many small files which degrade read performance.

5. Review pipeline mode

If both pipelines are in triggered mode, they perform initialization steps on every trigger. Continuous pipelines only initialize on restart. If only the slower one is triggered, that could explain the difference.

Sources:

Anish_2
New Contributor III

@SteveOstrowski  i have encountered one more issue with autocdc. Below is snipped of my autocdc where there are 2 source tables for vlc_hist. After completion, i validated data but data is missing from source 1 is still missiing from vlc_hist silver table. Could you please let me know where i went wrong?

 

Screenshot 2026-03-21 014734.png