05-13-2025 12:20 PM
Hello, I'm working on a DLT pipeline to build a what would be a Datawarehouse/Datamart. I'm facing issues trying to "update" my fact table when the dimensions that are outside the pipeline fail to be up to date at my processing time, so on the next run im trying to "update the dimensions reference Id's" when I'm able to get a match
My current approach is using 2 Materialized views:
# Command 1:
@dlt.table
def vw_fact():
df = spark.read.table("my_view")
df_dim_1 = spark.read.table(dim_table_1)
df_dim_2 = spark.read.table(dim_table_2) # Externally from the DLT maintained dimension table
df_joined = (
df.alias("fact")
.join(
df_dim_1.alias("dim_1"),
on=col("fact.ref_dim1_code") == col("dim_1.code_dim_1"),
how="left"
)
.join(
df_dim_2.alias("dim_2"),
on=col("fact.ref_dim2_code") == col("dim_2.code_dim_2"),
how="left"
)
.select(
col("fact.id"),
col("fact.col_1"),
col("fact.col_2"),
col("fact.col_3"),
...,
col("dim_1.id_dim_1"),
col("dim_2.id_dim_2"), # Externally from the DLT maintained dimension table, id may not be present at the time of the first processing
...,
col("fact.col_9"),
col("fact.col_10"),
col("fact.inserted_timestamp")
)
)
return df_joined
# Command 2:
table_args = {
"name": destination_fact,
"table_properties": {"quality": "silver"},
"comment": "Silver table to store Fact..."
}
dlt.create_streaming_table(**table_args)
dlt.apply_changes(
target = destination_fact,
source = "vw_fact",
keys = ["id"],
sequence_by= col('inserted_timestamp'),
stored_as_scd_type = "1"
)
# Command 3:
# The intention here is to get the Dim 2 Id's that arrived since the last processing that were not present at the Command's 1 and 2 previous execution
@dlt.table(name="fix_missing_dim_2")
def fix_missing_dim_2():
df_fact = spark.read.table(destination_fact)
df_dim_2 = spark.read.table(dim_table_2)
return (
df_fact.alias("fact")
.join(
df_dim_2.alias("dim_2"),
on=col("lead.source_afid") == col("dim_2.afid"),
how="inner"
)
.filter(col("fact.id_dim_2").isNull() & col("dim_2.id_dim_2").isNotNull())
.select(
col("fact.id"),
col("dim_2.id_dim_2"),
col("fact.inserted_timestamp")
)
)
# Command 4:
dlt.apply_changes(
target=destination_fact,
source="fix_missing_dim_2",
keys=["id"],
sequence_by=col("inserted_timestamp"),
stored_as_scd_type="1"
)
But this generates the following error:
AnalysisException: Cannot have multiple queries named `catalog`.`schema`.`destination_fact` for `catalog`.`schema`.`destination_fact`. Additional queries on that table must be named. Note that unnamed queries default to the same name as the table
Then I ran into "append-flows", where there is also mention of a "Type of flow" apply_changes, and the documentation states this:
"...Multiple apply changes flows can target a single streaming table. A streaming table that acts as a target for an apply changes flow can only be targeted by other apply changes flows...."
But there are not concrete examples of it usge.
Any help would be highly appreciated!
05-13-2025 12:32 PM
catalog
.schema
.destination_fact
for catalog
.schema
.destination_fact
. Additional queries on that table must be named," arises because Delta Live Tables (DLT) disallows multiple unnamed queries targeting the same table. This limitation stems from how DLT processes and manages tables within a pipeline. To resolve this, you must assign unique flow names to each apply_changes
command targeting the same table.apply_changes
resolves this issue. Here's how this can be implemented:apply_changes
statements to include a flow_name
parameter. This parameter uniquely identifies each change flow targeting the same table.# Apply Changes for Initial Ingestion
dlt.apply_changes(
target=destination_fact,
source="vw_fact",
keys=["id"],
sequence_by=col("inserted_timestamp"),
stored_as_scd_type="1",
flow_name="initial_ingestion_flow"
)
# Apply Changes for Fixing Missing Dimension 2 IDs
dlt.apply_changes(
target=destination_fact,
source="fix_missing_dim_2",
keys=["id"],
sequence_by=col("inserted_timestamp"),
stored_as_scd_type="1",
flow_name="missing_dim_2_fix_flow"
)
"initial_ingestion_flow"
and "missing_dim_2_fix_flow"
, you ensure that DLT can differentiate between the change flows. This approach aligns with the solution mentioned in community discussions and documentation.05-13-2025 12:43 PM
Hello Lou,
Thanks a lot for your reply. I got a new error stating that I was trying to use Private preview features, I've switched my pipeline channel to preview to test it and will follow up
05-14-2025 04:39 PM
Hello Lou,
Like I mentioned yesterday, thanks to your help I was able to move forward from the error I had, but now im facing the following error:
cell 28, line 4, in fix_missing_dim_2 df_fact = dlt.read("fact") ^^^^^^^^^^^^^^^^^^^^^^^^^^^ pyspark.errors.exceptions.captured.AnalysisException: Failed to read dataset 'catalog.schema.fact'. Dataset is defined in the pipeline but could not be resolved.
This is happenning on the command 3, specifically in the line:
Line 1
df_fact = spark.read.table(destination_fact)
Is as if the pipeline its not recognizing the previous step where I already executed the :
Line 2
dlt.create_streaming_table(**table_args)
Through my trial and error, for the time beign I have the 1st line as:
df_lead_fact = dlt.read("fact")
But that is still not working.
Thanks again for your time and help
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now