cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT table reading not performing file pruning on partition column

gkapri
New Contributor II

I have created bronze table and partitioned on processing date which is date column. In silver table i am putting filter on basis of processing date column to read last 2 days data but it is reading 37 million data but i have only 24722 in last 2 days partitions. Below is screenshot of query profile in Delta live table

gkapri_0-1770221522007.png

Below is function to filter last 2 days data

def filter_latest_records(df,timing_column = 'processing_date'😞
    df = df.filter(col(timing_column) >= date_sub(current_date(), 1))
    return df
 
Can some help me why it is reading 37 million data?
15 REPLIES 15

SteveOstrowski
Databricks Employee
Databricks Employee

Hi @Anish_2,

Looking at your pipeline DAG, the issue is that you have two separate APPLY CHANGES INTO flows both targeting the same silver table (ag_vlc_hist), one from ag_swt_vlchistory_historical and one from ag_swt_vlchistory. When you define multiple APPLY CHANGES INTO statements against the same target without giving each flow a unique name, they both default to using the target table name as the flow name. This means they share the same internal checkpoint, and the second flow's results effectively replace the first. That is why data from source 1 is missing.

THE FIX: USE UNIQUE FLOW NAMES (RECOMMENDED)

Lakeflow Spark Declarative Pipelines (SDP) supports multiple APPLY CHANGES flows targeting a single streaming table, but each flow must have a unique name. You do this with the flow_name parameter in Python or the CREATE FLOW syntax in SQL. Each named flow gets its own checkpoint, so both sources process independently and merge correctly into the target.

Python example:

import dlt
from pyspark.sql.functions import col

dlt.create_streaming_table("ag_vlc_hist")

dlt.apply_changes(
    flow_name="historical_flow",
    target="ag_vlc_hist",
    source="ag_swt_vlchistory_historical",
    keys=["your_primary_key_column"],
    sequence_by=col("your_sequence_column"),
    stored_as_scd_type=1
)

dlt.apply_changes(
    flow_name="raw_flow",
    target="ag_vlc_hist",
    source="ag_swt_vlchistory",
    keys=["your_primary_key_column"],
    sequence_by=col("your_sequence_column"),
    stored_as_scd_type=1
)

SQL example:

CREATE OR REFRESH STREAMING TABLE ag_vlc_hist;

CREATE FLOW historical_flow
  AS APPLY CHANGES INTO LIVE.ag_vlc_hist
  FROM STREAM(LIVE.ag_swt_vlchistory_historical)
  KEYS (your_primary_key_column)
  SEQUENCE BY your_sequence_column
  STORED AS SCD TYPE 1;

CREATE FLOW raw_flow
  AS APPLY CHANGES INTO LIVE.ag_vlc_hist
  FROM STREAM(LIVE.ag_swt_vlchistory)
  KEYS (your_primary_key_column)
  SEQUENCE BY your_sequence_column
  STORED AS SCD TYPE 1;

This is the preferred approach because each source gets its own checkpoint and can progress independently. If one source fails or is slow, the other is unaffected.

ALTERNATIVE: UNION SOURCES FIRST

If you prefer a simpler setup with a single flow, you can UNION both sources into a combined view and use one APPLY CHANGES INTO:

import dlt
from pyspark.sql.functions import col

@dlt.view()
def ag_swt_vlchistory_combined():
    historical = spark.readStream.table("LIVE.ag_swt_vlchistory_historical")
    raw = spark.readStream.table("LIVE.ag_swt_vlchistory")
    return historical.unionByName(raw, allowMissingColumns=True)

dlt.create_streaming_table("ag_vlc_hist")

dlt.apply_changes(
    target="ag_vlc_hist",
    source="ag_swt_vlchistory_combined",
    keys=["your_primary_key_column"],
    sequence_by=col("your_sequence_column"),
    stored_as_scd_type=1
)

This works well but couples both sources into a single checkpoint, so a failure in either source blocks the other.

IMPORTANT REQUIREMENTS

  1. Same keys across flows: All APPLY CHANGES flows writing to the same target must use the same keys. The sequencing must also be consistent so the engine can correctly determine which record is latest when the same key appears in both sources.

  2. Schema alignment: Make sure both sources have compatible column names and types. If they differ slightly, normalize them in the source views before they reach APPLY CHANGES.

  3. No mixing flow types: A table targeted by APPLY CHANGES flows can only have APPLY CHANGES flows. You cannot mix APPLY CHANGES and regular append flows on the same target.

  4. Pipeline channel: The multiple named flows feature may require the Preview channel depending on your Databricks runtime version. If you get an error, try setting your pipeline to the Preview channel in the pipeline settings.

DOCUMENTATION

Flows in Lakeflow Spark Declarative Pipelines (SDP): https://docs.databricks.com/en/dlt/flows.html

APPLY CHANGES APIs: https://docs.databricks.com/en/dlt/cdc.html

Python API reference (flow_name parameter): https://docs.databricks.com/en/dlt-ref/dlt-python-ref-apply-changes.html

  • This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.

If this answer resolves your question, could you mark it as "Accept as Solution"? That helps other users quickly find the correct fix.