<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: DLT Pipeline upsert question in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/dlt-pipeline-upsert-question/m-p/119261#M45824</link>
    <description>&lt;P&gt;Hello Lou,&lt;/P&gt;&lt;P&gt;Like I mentioned yesterday, thanks to your help I was able to move forward from the error I had, but now im facing the following error:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;LI-CODE lang="javascript"&gt;cell 28, line 4, in fix_missing_dim_2 df_fact = dlt.read("fact") ^^^^^^^^^^^^^^^^^^^^^^^^^^^ pyspark.errors.exceptions.captured.AnalysisException: Failed to read dataset 'catalog.schema.fact'. Dataset is defined in the pipeline but could not be resolved.&lt;/LI-CODE&gt;&lt;P&gt;This is happenning on the command 3, specifically in the line:&lt;BR /&gt;Line 1&lt;/P&gt;&lt;LI-CODE lang="python"&gt;df_fact = spark.read.table(destination_fact)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;Is as if the pipeline its not recognizing the previous step where I already executed the :&lt;BR /&gt;Line 2&lt;/P&gt;&lt;LI-CODE lang="python"&gt;dlt.create_streaming_table(**table_args)&lt;/LI-CODE&gt;&lt;P&gt;Through my trial and error, for the time beign I have the 1st line as:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;df_lead_fact = dlt.read("fact")&lt;/LI-CODE&gt;&lt;P&gt;But that is still not working.&lt;/P&gt;&lt;P&gt;Thanks again for your time and help&lt;/P&gt;</description>
    <pubDate>Wed, 14 May 2025 23:39:46 GMT</pubDate>
    <dc:creator>oscarramosp</dc:creator>
    <dc:date>2025-05-14T23:39:46Z</dc:date>
    <item>
      <title>DLT Pipeline upsert question</title>
      <link>https://community.databricks.com/t5/data-engineering/dlt-pipeline-upsert-question/m-p/119082#M45792</link>
      <description>&lt;P&gt;Hello, I'm working on a DLT pipeline to build a what would be a Datawarehouse/Datamart. I'm facing issues trying to "update" my fact table when the dimensions that are outside the pipeline fail to be up to date at my processing time, so on the next run im trying to "update the dimensions reference Id's" when I'm able to get a match&lt;/P&gt;&lt;P&gt;My current approach is using 2 Materialized views:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;# Command 1:
@dlt.table
def vw_fact():
    df = spark.read.table("my_view")
    df_dim_1 = spark.read.table(dim_table_1)
    df_dim_2 = spark.read.table(dim_table_2) # Externally from the DLT maintained dimension table

    df_joined = (
        df.alias("fact")
        .join(
            df_dim_1.alias("dim_1"),
            on=col("fact.ref_dim1_code") == col("dim_1.code_dim_1"),
            how="left"
        )
        .join(
            df_dim_2.alias("dim_2"),
            on=col("fact.ref_dim2_code") == col("dim_2.code_dim_2"),
            how="left"
        )
        .select(
            col("fact.id"),
            col("fact.col_1"),
            col("fact.col_2"),
            col("fact.col_3"),
            ...,
            col("dim_1.id_dim_1"), 
            col("dim_2.id_dim_2"), # Externally from the DLT maintained dimension table, id may not be present at the time of the first processing
            ...,
            col("fact.col_9"),
            col("fact.col_10"),
            col("fact.inserted_timestamp")
        )
    )
    
    return df_joined

# Command 2:
table_args = {
    "name": destination_fact,
    "table_properties": {"quality": "silver"},
    "comment": "Silver table to store Fact..."
}

dlt.create_streaming_table(**table_args)

dlt.apply_changes(
    target = destination_fact,
    source = "vw_fact",
    keys = ["id"],
    sequence_by= col('inserted_timestamp'),
    stored_as_scd_type = "1"
)

# Command 3:
# The intention here is to get the Dim 2 Id's that arrived since the last processing that were not present at the Command's 1 and 2 previous execution
@dlt.table(name="fix_missing_dim_2")
def fix_missing_dim_2():
    df_fact = spark.read.table(destination_fact)
    df_dim_2 = spark.read.table(dim_table_2)

    return (
        df_fact.alias("fact")
        .join(
            df_dim_2.alias("dim_2"),
            on=col("lead.source_afid") == col("dim_2.afid"),
            how="inner"
        )
        .filter(col("fact.id_dim_2").isNull() &amp;amp; col("dim_2.id_dim_2").isNotNull())
        .select(
            col("fact.id"),
            col("dim_2.id_dim_2"),
            col("fact.inserted_timestamp")
        )
    )

# Command 4:
dlt.apply_changes(
    target=destination_fact,
    source="fix_missing_dim_2",
    keys=["id"],
    sequence_by=col("inserted_timestamp"),
    stored_as_scd_type="1"
)&lt;/LI-CODE&gt;&lt;P&gt;But this generates the following error:&lt;BR /&gt;AnalysisException: Cannot have multiple queries named `catalog`.`schema`.`destination_fact` for `catalog`.`schema`.`destination_fact`. Additional queries on that table must be named. Note that unnamed queries default to the same name as the table&lt;/P&gt;&lt;P&gt;Then I ran into "append-flows", where there is also mention of a "Type of flow" apply_changes, and the documentation states this:&lt;BR /&gt;"...&lt;SPAN&gt;Multiple apply changes flows can target a single streaming table. A streaming table that acts as a target for an apply changes flow can only be targeted by other apply changes flows...."&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;But there are not concrete examples of it usge.&lt;/P&gt;&lt;P&gt;Any help would be highly appreciated!&lt;/P&gt;</description>
      <pubDate>Tue, 13 May 2025 19:20:14 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/dlt-pipeline-upsert-question/m-p/119082#M45792</guid>
      <dc:creator>oscarramosp</dc:creator>
      <dc:date>2025-05-13T19:20:14Z</dc:date>
    </item>
    <item>
      <title>Re: DLT Pipeline upsert question</title>
      <link>https://community.databricks.com/t5/data-engineering/dlt-pipeline-upsert-question/m-p/119092#M45793</link>
      <description>&lt;DIV class="paragraph"&gt;The error encountered, "Cannot have multiple queries named &lt;CODE&gt;catalog&lt;/CODE&gt;.&lt;CODE&gt;schema&lt;/CODE&gt;.&lt;CODE&gt;destination_fact&lt;/CODE&gt; for &lt;CODE&gt;catalog&lt;/CODE&gt;.&lt;CODE&gt;schema&lt;/CODE&gt;.&lt;CODE&gt;destination_fact&lt;/CODE&gt;. Additional queries on that table must be named," arises because Delta Live Tables (DLT) disallows multiple unnamed queries targeting the same table. This limitation stems from how DLT processes and manages tables within a pipeline. To resolve this, you must assign unique flow names to each &lt;CODE&gt;apply_changes&lt;/CODE&gt; command targeting the same table.&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;The Databricks documentation and community examples mention that naming flows explicitly when using &lt;CODE&gt;apply_changes&lt;/CODE&gt; resolves this issue. Here's how this can be implemented:&lt;/DIV&gt;
&lt;H3&gt;Solution: Assign Unique Flow Names&lt;/H3&gt;
&lt;DIV class="paragraph"&gt;Modify the &lt;CODE&gt;apply_changes&lt;/CODE&gt; statements to include a &lt;CODE&gt;flow_name&lt;/CODE&gt; parameter. This parameter uniquely identifies each change flow targeting the same table.&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;Example:&lt;/DIV&gt;
&lt;PRE&gt;&lt;CODE class="markdown-code-python"&gt;# Apply Changes for Initial Ingestion
dlt.apply_changes(
    target=destination_fact,
    source="vw_fact",
    keys=["id"],
    sequence_by=col("inserted_timestamp"),
    stored_as_scd_type="1",
    flow_name="initial_ingestion_flow"
)

# Apply Changes for Fixing Missing Dimension 2 IDs
dlt.apply_changes(
    target=destination_fact,
    source="fix_missing_dim_2",
    keys=["id"],
    sequence_by=col("inserted_timestamp"),
    stored_as_scd_type="1",
    flow_name="missing_dim_2_fix_flow"
)&lt;/CODE&gt;&lt;/PRE&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;Explanation: By assigning flow names like &lt;CODE&gt;"initial_ingestion_flow"&lt;/CODE&gt; and &lt;CODE&gt;"missing_dim_2_fix_flow"&lt;/CODE&gt;, you ensure that DLT can differentiate between the change flows. This approach aligns with the solution mentioned in community discussions and documentation.&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;Limitations: - You must name every change flow explicitly to avoid conflicts. - Review pipeline performance to ensure updates are applied efficiently.&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;Hope this helps, Lou.&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;</description>
      <pubDate>Tue, 13 May 2025 19:32:41 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/dlt-pipeline-upsert-question/m-p/119092#M45793</guid>
      <dc:creator>Louis_Frolio</dc:creator>
      <dc:date>2025-05-13T19:32:41Z</dc:date>
    </item>
    <item>
      <title>Re: DLT Pipeline upsert question</title>
      <link>https://community.databricks.com/t5/data-engineering/dlt-pipeline-upsert-question/m-p/119094#M45794</link>
      <description>&lt;P&gt;Hello Lou,&lt;/P&gt;&lt;P&gt;Thanks a lot for your reply. I got a new error stating that I was trying to use Private preview features, I've switched my pipeline channel to preview to test it and will follow up&lt;/P&gt;</description>
      <pubDate>Tue, 13 May 2025 19:43:33 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/dlt-pipeline-upsert-question/m-p/119094#M45794</guid>
      <dc:creator>oscarramosp</dc:creator>
      <dc:date>2025-05-13T19:43:33Z</dc:date>
    </item>
    <item>
      <title>Re: DLT Pipeline upsert question</title>
      <link>https://community.databricks.com/t5/data-engineering/dlt-pipeline-upsert-question/m-p/119261#M45824</link>
      <description>&lt;P&gt;Hello Lou,&lt;/P&gt;&lt;P&gt;Like I mentioned yesterday, thanks to your help I was able to move forward from the error I had, but now im facing the following error:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;LI-CODE lang="javascript"&gt;cell 28, line 4, in fix_missing_dim_2 df_fact = dlt.read("fact") ^^^^^^^^^^^^^^^^^^^^^^^^^^^ pyspark.errors.exceptions.captured.AnalysisException: Failed to read dataset 'catalog.schema.fact'. Dataset is defined in the pipeline but could not be resolved.&lt;/LI-CODE&gt;&lt;P&gt;This is happenning on the command 3, specifically in the line:&lt;BR /&gt;Line 1&lt;/P&gt;&lt;LI-CODE lang="python"&gt;df_fact = spark.read.table(destination_fact)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;Is as if the pipeline its not recognizing the previous step where I already executed the :&lt;BR /&gt;Line 2&lt;/P&gt;&lt;LI-CODE lang="python"&gt;dlt.create_streaming_table(**table_args)&lt;/LI-CODE&gt;&lt;P&gt;Through my trial and error, for the time beign I have the 1st line as:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;df_lead_fact = dlt.read("fact")&lt;/LI-CODE&gt;&lt;P&gt;But that is still not working.&lt;/P&gt;&lt;P&gt;Thanks again for your time and help&lt;/P&gt;</description>
      <pubDate>Wed, 14 May 2025 23:39:46 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/dlt-pipeline-upsert-question/m-p/119261#M45824</guid>
      <dc:creator>oscarramosp</dc:creator>
      <dc:date>2025-05-14T23:39:46Z</dc:date>
    </item>
  </channel>
</rss>

