Hi @Anish_2,
Looking at your pipeline DAG, the issue is that you have two separate APPLY CHANGES INTO flows both targeting the same silver table (ag_vlc_hist), one from ag_swt_vlchistory_historical and one from ag_swt_vlchistory. When you define multiple APPLY CHANGES INTO statements against the same target without giving each flow a unique name, they both default to using the target table name as the flow name. This means they share the same internal checkpoint, and the second flow's results effectively replace the first. That is why data from source 1 is missing.
THE FIX: USE UNIQUE FLOW NAMES (RECOMMENDED)
Lakeflow Spark Declarative Pipelines (SDP) supports multiple APPLY CHANGES flows targeting a single streaming table, but each flow must have a unique name. You do this with the flow_name parameter in Python or the CREATE FLOW syntax in SQL. Each named flow gets its own checkpoint, so both sources process independently and merge correctly into the target.
Python example:
import dlt
from pyspark.sql.functions import col
dlt.create_streaming_table("ag_vlc_hist")
dlt.apply_changes(
flow_name="historical_flow",
target="ag_vlc_hist",
source="ag_swt_vlchistory_historical",
keys=["your_primary_key_column"],
sequence_by=col("your_sequence_column"),
stored_as_scd_type=1
)
dlt.apply_changes(
flow_name="raw_flow",
target="ag_vlc_hist",
source="ag_swt_vlchistory",
keys=["your_primary_key_column"],
sequence_by=col("your_sequence_column"),
stored_as_scd_type=1
)
SQL example:
CREATE OR REFRESH STREAMING TABLE ag_vlc_hist;
CREATE FLOW historical_flow
AS APPLY CHANGES INTO LIVE.ag_vlc_hist
FROM STREAM(LIVE.ag_swt_vlchistory_historical)
KEYS (your_primary_key_column)
SEQUENCE BY your_sequence_column
STORED AS SCD TYPE 1;
CREATE FLOW raw_flow
AS APPLY CHANGES INTO LIVE.ag_vlc_hist
FROM STREAM(LIVE.ag_swt_vlchistory)
KEYS (your_primary_key_column)
SEQUENCE BY your_sequence_column
STORED AS SCD TYPE 1;
This is the preferred approach because each source gets its own checkpoint and can progress independently. If one source fails or is slow, the other is unaffected.
ALTERNATIVE: UNION SOURCES FIRST
If you prefer a simpler setup with a single flow, you can UNION both sources into a combined view and use one APPLY CHANGES INTO:
import dlt
from pyspark.sql.functions import col
@dlt.view()
def ag_swt_vlchistory_combined():
historical = spark.readStream.table("LIVE.ag_swt_vlchistory_historical")
raw = spark.readStream.table("LIVE.ag_swt_vlchistory")
return historical.unionByName(raw, allowMissingColumns=True)
dlt.create_streaming_table("ag_vlc_hist")
dlt.apply_changes(
target="ag_vlc_hist",
source="ag_swt_vlchistory_combined",
keys=["your_primary_key_column"],
sequence_by=col("your_sequence_column"),
stored_as_scd_type=1
)
This works well but couples both sources into a single checkpoint, so a failure in either source blocks the other.
IMPORTANT REQUIREMENTS
-
Same keys across flows: All APPLY CHANGES flows writing to the same target must use the same keys. The sequencing must also be consistent so the engine can correctly determine which record is latest when the same key appears in both sources.
-
Schema alignment: Make sure both sources have compatible column names and types. If they differ slightly, normalize them in the source views before they reach APPLY CHANGES.
-
No mixing flow types: A table targeted by APPLY CHANGES flows can only have APPLY CHANGES flows. You cannot mix APPLY CHANGES and regular append flows on the same target.
-
Pipeline channel: The multiple named flows feature may require the Preview channel depending on your Databricks runtime version. If you get an error, try setting your pipeline to the Preview channel in the pipeline settings.
DOCUMENTATION
Flows in Lakeflow Spark Declarative Pipelines (SDP): https://docs.databricks.com/en/dlt/flows.html
APPLY CHANGES APIs: https://docs.databricks.com/en/dlt/cdc.html
Python API reference (flow_name parameter): https://docs.databricks.com/en/dlt-ref/dlt-python-ref-apply-changes.html
-
This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.
If this answer resolves your question, could you mark it as "Accept as Solution"? That helps other users quickly find the correct fix.