DLT Pipeline upsert question

oscarramosp
New Contributor II

Hello, I'm working on a DLT pipeline to build a what would be a Datawarehouse/Datamart. I'm facing issues trying to "update" my fact table when the dimensions that are outside the pipeline fail to be up to date at my processing time, so on the next run im trying to "update the dimensions reference Id's" when I'm able to get a match

My current approach is using 2 Materialized views:

# Command 1:
@dlt.table
def vw_fact():
    df = spark.read.table("my_view")
    df_dim_1 = spark.read.table(dim_table_1)
    df_dim_2 = spark.read.table(dim_table_2) # Externally from the DLT maintained dimension table

    df_joined = (
        df.alias("fact")
        .join(
            df_dim_1.alias("dim_1"),
            on=col("fact.ref_dim1_code") == col("dim_1.code_dim_1"),
            how="left"
        )
        .join(
            df_dim_2.alias("dim_2"),
            on=col("fact.ref_dim2_code") == col("dim_2.code_dim_2"),
            how="left"
        )
        .select(
            col("fact.id"),
            col("fact.col_1"),
            col("fact.col_2"),
            col("fact.col_3"),
            ...,
            col("dim_1.id_dim_1"), 
            col("dim_2.id_dim_2"), # Externally from the DLT maintained dimension table, id may not be present at the time of the first processing
            ...,
            col("fact.col_9"),
            col("fact.col_10"),
            col("fact.inserted_timestamp")
        )
    )
    
    return df_joined

# Command 2:
table_args = {
    "name": destination_fact,
    "table_properties": {"quality": "silver"},
    "comment": "Silver table to store Fact..."
}

dlt.create_streaming_table(**table_args)

dlt.apply_changes(
    target = destination_fact,
    source = "vw_fact",
    keys = ["id"],
    sequence_by= col('inserted_timestamp'),
    stored_as_scd_type = "1"
)

# Command 3:
# The intention here is to get the Dim 2 Id's that arrived since the last processing that were not present at the Command's 1 and 2 previous execution
@dlt.table(name="fix_missing_dim_2")
def fix_missing_dim_2():
    df_fact = spark.read.table(destination_fact)
    df_dim_2 = spark.read.table(dim_table_2)

    return (
        df_fact.alias("fact")
        .join(
            df_dim_2.alias("dim_2"),
            on=col("lead.source_afid") == col("dim_2.afid"),
            how="inner"
        )
        .filter(col("fact.id_dim_2").isNull() & col("dim_2.id_dim_2").isNotNull())
        .select(
            col("fact.id"),
            col("dim_2.id_dim_2"),
            col("fact.inserted_timestamp")
        )
    )

# Command 4:
dlt.apply_changes(
    target=destination_fact,
    source="fix_missing_dim_2",
    keys=["id"],
    sequence_by=col("inserted_timestamp"),
    stored_as_scd_type="1"
)

But this generates the following error:
AnalysisException: Cannot have multiple queries named `catalog`.`schema`.`destination_fact` for `catalog`.`schema`.`destination_fact`. Additional queries on that table must be named. Note that unnamed queries default to the same name as the table

Then I ran into "append-flows", where there is also mention of a "Type of flow" apply_changes, and the documentation states this:
"...Multiple apply changes flows can target a single streaming table. A streaming table that acts as a target for an apply changes flow can only be targeted by other apply changes flows...."

But there are not concrete examples of it usge.

Any help would be highly appreciated!