cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT Pipeline upsert question

oscarramosp
New Contributor II

Hello, I'm working on a DLT pipeline to build a what would be a Datawarehouse/Datamart. I'm facing issues trying to "update" my fact table when the dimensions that are outside the pipeline fail to be up to date at my processing time, so on the next run im trying to "update the dimensions reference Id's" when I'm able to get a match

My current approach is using 2 Materialized views:

# Command 1:
@dlt.table
def vw_fact():
    df = spark.read.table("my_view")
    df_dim_1 = spark.read.table(dim_table_1)
    df_dim_2 = spark.read.table(dim_table_2) # Externally from the DLT maintained dimension table

    df_joined = (
        df.alias("fact")
        .join(
            df_dim_1.alias("dim_1"),
            on=col("fact.ref_dim1_code") == col("dim_1.code_dim_1"),
            how="left"
        )
        .join(
            df_dim_2.alias("dim_2"),
            on=col("fact.ref_dim2_code") == col("dim_2.code_dim_2"),
            how="left"
        )
        .select(
            col("fact.id"),
            col("fact.col_1"),
            col("fact.col_2"),
            col("fact.col_3"),
            ...,
            col("dim_1.id_dim_1"), 
            col("dim_2.id_dim_2"), # Externally from the DLT maintained dimension table, id may not be present at the time of the first processing
            ...,
            col("fact.col_9"),
            col("fact.col_10"),
            col("fact.inserted_timestamp")
        )
    )
    
    return df_joined

# Command 2:
table_args = {
    "name": destination_fact,
    "table_properties": {"quality": "silver"},
    "comment": "Silver table to store Fact..."
}

dlt.create_streaming_table(**table_args)

dlt.apply_changes(
    target = destination_fact,
    source = "vw_fact",
    keys = ["id"],
    sequence_by= col('inserted_timestamp'),
    stored_as_scd_type = "1"
)

# Command 3:
# The intention here is to get the Dim 2 Id's that arrived since the last processing that were not present at the Command's 1 and 2 previous execution
@dlt.table(name="fix_missing_dim_2")
def fix_missing_dim_2():
    df_fact = spark.read.table(destination_fact)
    df_dim_2 = spark.read.table(dim_table_2)

    return (
        df_fact.alias("fact")
        .join(
            df_dim_2.alias("dim_2"),
            on=col("lead.source_afid") == col("dim_2.afid"),
            how="inner"
        )
        .filter(col("fact.id_dim_2").isNull() & col("dim_2.id_dim_2").isNotNull())
        .select(
            col("fact.id"),
            col("dim_2.id_dim_2"),
            col("fact.inserted_timestamp")
        )
    )

# Command 4:
dlt.apply_changes(
    target=destination_fact,
    source="fix_missing_dim_2",
    keys=["id"],
    sequence_by=col("inserted_timestamp"),
    stored_as_scd_type="1"
)

But this generates the following error:
AnalysisException: Cannot have multiple queries named `catalog`.`schema`.`destination_fact` for `catalog`.`schema`.`destination_fact`. Additional queries on that table must be named. Note that unnamed queries default to the same name as the table

Then I ran into "append-flows", where there is also mention of a "Type of flow" apply_changes, and the documentation states this:
"...Multiple apply changes flows can target a single streaming table. A streaming table that acts as a target for an apply changes flow can only be targeted by other apply changes flows...."

But there are not concrete examples of it usge.

Any help would be highly appreciated!

3 REPLIES 3

BigRoux
Databricks Employee
Databricks Employee
The error encountered, "Cannot have multiple queries named catalog.schema.destination_fact for catalog.schema.destination_fact. Additional queries on that table must be named," arises because Delta Live Tables (DLT) disallows multiple unnamed queries targeting the same table. This limitation stems from how DLT processes and manages tables within a pipeline. To resolve this, you must assign unique flow names to each apply_changes command targeting the same table.
 
The Databricks documentation and community examples mention that naming flows explicitly when using apply_changes resolves this issue. Here's how this can be implemented:

Solution: Assign Unique Flow Names

Modify the apply_changes statements to include a flow_name parameter. This parameter uniquely identifies each change flow targeting the same table.
Example:
# Apply Changes for Initial Ingestion
dlt.apply_changes(
    target=destination_fact,
    source="vw_fact",
    keys=["id"],
    sequence_by=col("inserted_timestamp"),
    stored_as_scd_type="1",
    flow_name="initial_ingestion_flow"
)

# Apply Changes for Fixing Missing Dimension 2 IDs
dlt.apply_changes(
    target=destination_fact,
    source="fix_missing_dim_2",
    keys=["id"],
    sequence_by=col("inserted_timestamp"),
    stored_as_scd_type="1",
    flow_name="missing_dim_2_fix_flow"
)
 
Explanation: By assigning flow names like "initial_ingestion_flow" and "missing_dim_2_fix_flow", you ensure that DLT can differentiate between the change flows. This approach aligns with the solution mentioned in community discussions and documentation.
 
Limitations: - You must name every change flow explicitly to avoid conflicts. - Review pipeline performance to ensure updates are applied efficiently.
 
Hope this helps, Lou.
 

oscarramosp
New Contributor II

Hello Lou,

Thanks a lot for your reply. I got a new error stating that I was trying to use Private preview features, I've switched my pipeline channel to preview to test it and will follow up

oscarramosp
New Contributor II

Hello Lou,

Like I mentioned yesterday, thanks to your help I was able to move forward from the error I had, but now im facing the following error:

cell 28, line 4, in fix_missing_dim_2 df_fact = dlt.read("fact") ^^^^^^^^^^^^^^^^^^^^^^^^^^^ pyspark.errors.exceptions.captured.AnalysisException: Failed to read dataset 'catalog.schema.fact'. Dataset is defined in the pipeline but could not be resolved.

This is happenning on the command 3, specifically in the line:
Line 1

df_fact = spark.read.table(destination_fact)

 Is as if the pipeline its not recognizing the previous step where I already executed the :
Line 2

dlt.create_streaming_table(**table_args)

Through my trial and error, for the time beign I have the 1st line as:

df_lead_fact = dlt.read("fact")

But that is still not working.

Thanks again for your time and help

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now