<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: DLT Append Flow Parameterization in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/dlt-append-flow-parameterization/m-p/118370#M45633</link>
    <description>&lt;P&gt;Hi Big Roux,&lt;/P&gt;&lt;P&gt;Following up on my previous response, I think it might help if I describe the situation more clearly.&lt;/P&gt;&lt;P&gt;The pipeline starts with read_stream from a Delta table and create a view without applying watermarking.&lt;BR /&gt;The next stage involves using create_streaming_table and the apply_flow. I'm trying to read from different source and create separate views, but I need to aggregate the result after doing some transformation on these sources.&lt;/P&gt;&lt;P&gt;Delta_tables -&amp;gt; Views -&amp;gt; Streaming_table(append_flow)&lt;/P&gt;&lt;P&gt;In this case, do I need to apply the water marking in the first step when I try to read stream from my actual source(the Delta table)?&lt;/P&gt;&lt;P&gt;I hope this could give you a clearer picture on how my current pipeline works.&lt;/P&gt;&lt;P&gt;Thank you.&lt;/P&gt;&lt;P&gt;Regards,&lt;/P&gt;&lt;P&gt;Dejian&lt;/P&gt;</description>
    <pubDate>Thu, 08 May 2025 04:00:45 GMT</pubDate>
    <dc:creator>Dejian</dc:creator>
    <dc:date>2025-05-08T04:00:45Z</dc:date>
    <item>
      <title>DLT Append Flow Parameterization</title>
      <link>https://community.databricks.com/t5/data-engineering/dlt-append-flow-parameterization/m-p/117105#M45427</link>
      <description>&lt;P&gt;Hi All,&lt;/P&gt;&lt;P&gt;I'm currently using DLT append flow to merge multiple streaming flows into one output.&lt;BR /&gt;While trying to make the append flow into a dynamic function for scalability, the dlt append flow seem to have some errors.&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;stat_table =&amp;nbsp;f"{catalog}.{bronze_schema}.output"&lt;/SPAN&gt;&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;dlt&lt;/SPAN&gt;&lt;SPAN&gt;.create_streaming_table(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;name&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;stat_table&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;append_flow&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;stat_table&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;source&lt;/SPAN&gt;&lt;SPAN&gt;&lt;span class="lia-unicode-emoji" title=":disappointed_face:"&gt;😞&lt;/span&gt;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;@&lt;/SPAN&gt;&lt;SPAN&gt;dlt&lt;/SPAN&gt;&lt;SPAN&gt;.append_flow&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;target&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;stat_table&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;name&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;source&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;_flow"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;topic_flow&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;topic&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;source&lt;/SPAN&gt;&lt;SPAN&gt;&lt;span class="lia-unicode-emoji" title=":disappointed_face:"&gt;😞&lt;/span&gt;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;return&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;dlt&lt;/SPAN&gt;&lt;SPAN&gt;.read_stream(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;source&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;list&lt;/SPAN&gt; &lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; [&lt;/SPAN&gt;&lt;SPAN&gt;'table1'&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;'table2'&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;'table3'&lt;/SPAN&gt;&lt;SPAN&gt;]&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;for&lt;/SPAN&gt; &lt;SPAN&gt;source&lt;/SPAN&gt; &lt;SPAN&gt;in&lt;/SPAN&gt; &lt;SPAN&gt;list&lt;/SPAN&gt;&lt;SPAN&gt;:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;append_flow&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;stat_table&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;source&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;The {source} is another dlt view within the same dlt pipeline.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;The error message:&lt;BR /&gt;Flow 'dejian.test.table3_flow' could not be planned in append mode, but there are multiple flows writing to its destination `dejian`.`bronze`.`__materialization_mat_2d2a9216_c0f8_4ca4_ad69_11fec5c94151_output_1`. Starting in complete mode will cause results to be overwritten. Please edit the flow definition to allow for append mode.&lt;BR /&gt;Append mode error (full traces in the driver logs):&lt;BR /&gt;[STREAMING_OUTPUT_MODE.UNSUPPORTED_OPERATION] Invalid streaming output mode: append. This output mode is not supported for streaming aggregations without watermark on streaming DataFrames/DataSets. SQLSTATE: 42KDE&lt;BR /&gt;&lt;BR /&gt;I tried using watermarks but I think it does not work as well.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;I saw example in documentation show looping for different kafka topics as source, does this support non-kafka source as well?&lt;/DIV&gt;&lt;DIV&gt;&lt;BR /&gt;Please advice.&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;Thank you.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Wed, 30 Apr 2025 08:25:28 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/dlt-append-flow-parameterization/m-p/117105#M45427</guid>
      <dc:creator>Dejian</dc:creator>
      <dc:date>2025-04-30T08:25:28Z</dc:date>
    </item>
    <item>
      <title>Re: DLT Append Flow Parameterization</title>
      <link>https://community.databricks.com/t5/data-engineering/dlt-append-flow-parameterization/m-p/117193#M45453</link>
      <description>&lt;DIV class="paragraph"&gt;The error you're encountering occurs because Delta Live Tables (DLT) append flows currently do not support streaming aggregations or other transformations on streaming DataFrames unless a watermark is applied properly to handle late data. Based on your query and the provided context, let’s carefully address the issues and clarify some points:&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;1. &lt;STRONG&gt;Flow Planning in Append Mode&lt;/STRONG&gt;: The error indicates that DLT is unable to compute a plan for append mode because multiple flows are writing to the same target table. However, the target table needs clearly defined appendable data that streaming transformations can process incrementally.&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;2. Watermarking&lt;/STRONG&gt;: The error message indicates the necessity to use watermarks effectively to support append mode for cases involving streaming aggregations. Watermarks are essential to manage event-time-based processing in order to drop late events and enable structured streaming to process data incrementally.&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;Points to Address: Dynamic Function in DLT Append Flows - The example in the documentation supports looping through multiple topics for Kafka sources, and similar patterns should also apply to non-Kafka sources. Here's how to dynamically create &lt;CODE&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/97035"&gt;@Dlt&lt;/a&gt;.append_flow&lt;/CODE&gt; for each source, adapting your logic for non-Kafka scenarios.&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;#### Recommendations: 1. &lt;STRONG&gt;Ensure Watermark is Applied to Source Streams&lt;/STRONG&gt;: Add &lt;CODE&gt;.withWatermark(event_time_column, "time_limit")&lt;/CODE&gt; to your streaming DataFrame of each source to manage late-arrival data. Watermarking is critical for streaming aggregations.&lt;/DIV&gt;
&lt;OL start="2"&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Refactor Code for Dynamic &lt;CODE&gt;@append_flow&lt;/CODE&gt; Definitions&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Move your logic into individual &lt;CODE&gt;@append_flow&lt;/CODE&gt;-decorated functions for each source, ensuring they return a DataFrame compatible with append mode.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Avoid Multiple Writers to the Same Output Using &lt;CODE&gt;@append_flow&lt;/CODE&gt;&lt;/STRONG&gt;:
&lt;UL&gt;
&lt;LI&gt;Ensure each &lt;CODE&gt;@append_flow&lt;/CODE&gt; writes to distinct portions of data in the target table, or refactor the pipeline to combine streams upstream if possible.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/DIV&gt;
&lt;/LI&gt;
&lt;/OL&gt;
&lt;DIV class="paragraph"&gt;Example Code for Non-Kafka Source Streams: Below is the revised version of your pipeline, incorporating watermarks and avoiding the issue of multiple flows writing to the same target without clear partitioning logic:&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;```python import dlt from pyspark.sql.functions import col&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;# Define the catalog, schema, and output table catalog = "&amp;lt;catalog&amp;gt;" bronze_schema = "&amp;lt;bronze_schema&amp;gt;" stat_table = f"{catalog}.{bronze_schema}.output"&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;# Create the target streaming table dlt.create_streaming_table(name=stat_table)&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;# Function to create dynamic flows with watermarks for each source list = ['table1', 'table2', 'table3'] # Replace this with your actual sources&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;for source in list: &lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/97035"&gt;@Dlt&lt;/a&gt;.append_flow(target=stat_table, name=f"{source}_flow") def process_source(): return dlt.read_stream(f"{source}").withWatermark("event_time_column", "10 minutes").select("*") &lt;CODE&gt;``
Here are the key highlights of this approach:
- A &lt;/CODE&gt;withWatermark&lt;CODE&gt; is applied to every source, using an appropriate column (e.g., &lt;/CODE&gt;event_time_column&lt;CODE&gt;) and a time limit like "10 minutes" to ensure support for append mode.
- Each flow has a unique name (e.g., &lt;/CODE&gt;"{source}_flow"`), ensuring that checkpoints are distinct and consistent with the flow definition.&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;Additional Notes: - The provided solution is scalable, as each additional source automatically generates a new &lt;CODE&gt;@append_flow&lt;/CODE&gt; without conflict. - If your sources don’t have &lt;CODE&gt;event_time&lt;/CODE&gt; columns, consider generating a pseudo-event time column for them based on ingestion timestamps.&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;Reference Documentation: - Delta Live Tables append flow documentation mentions the importance of watermarks and dynamic &lt;CODE&gt;@append_flow&lt;/CODE&gt; processing. - Examples of handling multiple flows to a single target are also described in other contexts.&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;.&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;Hope this helps, Big Roux.&lt;/DIV&gt;</description>
      <pubDate>Wed, 30 Apr 2025 18:20:46 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/dlt-append-flow-parameterization/m-p/117193#M45453</guid>
      <dc:creator>Louis_Frolio</dc:creator>
      <dc:date>2025-04-30T18:20:46Z</dc:date>
    </item>
    <item>
      <title>Re: DLT Append Flow Parameterization</title>
      <link>https://community.databricks.com/t5/data-engineering/dlt-append-flow-parameterization/m-p/118369#M45632</link>
      <description>&lt;P&gt;Hi Big Roux,&lt;/P&gt;&lt;P&gt;Thank you for your explanation and sample code.&lt;/P&gt;&lt;P&gt;However, I did tested with your code but I still having the same error:&lt;BR /&gt;&lt;SPAN&gt;[STREAMING_OUTPUT_MODE.UNSUPPORTED_OPERATION] Invalid streaming output mode: append. This output mode is not supported for streaming aggregations without watermark on streaming DataFrames/DataSets. SQLSTATE: 42KDE&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;I have applied the watermarking when reading the source view table.&lt;BR /&gt;Is there any other criteria need to apply to the&amp;nbsp;&lt;SPAN&gt;f"{source}"&lt;/SPAN&gt; table so that the append_flow would work?&lt;/P&gt;&lt;P&gt;Thank you.&lt;/P&gt;</description>
      <pubDate>Thu, 08 May 2025 03:46:33 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/dlt-append-flow-parameterization/m-p/118369#M45632</guid>
      <dc:creator>Dejian</dc:creator>
      <dc:date>2025-05-08T03:46:33Z</dc:date>
    </item>
    <item>
      <title>Re: DLT Append Flow Parameterization</title>
      <link>https://community.databricks.com/t5/data-engineering/dlt-append-flow-parameterization/m-p/118370#M45633</link>
      <description>&lt;P&gt;Hi Big Roux,&lt;/P&gt;&lt;P&gt;Following up on my previous response, I think it might help if I describe the situation more clearly.&lt;/P&gt;&lt;P&gt;The pipeline starts with read_stream from a Delta table and create a view without applying watermarking.&lt;BR /&gt;The next stage involves using create_streaming_table and the apply_flow. I'm trying to read from different source and create separate views, but I need to aggregate the result after doing some transformation on these sources.&lt;/P&gt;&lt;P&gt;Delta_tables -&amp;gt; Views -&amp;gt; Streaming_table(append_flow)&lt;/P&gt;&lt;P&gt;In this case, do I need to apply the water marking in the first step when I try to read stream from my actual source(the Delta table)?&lt;/P&gt;&lt;P&gt;I hope this could give you a clearer picture on how my current pipeline works.&lt;/P&gt;&lt;P&gt;Thank you.&lt;/P&gt;&lt;P&gt;Regards,&lt;/P&gt;&lt;P&gt;Dejian&lt;/P&gt;</description>
      <pubDate>Thu, 08 May 2025 04:00:45 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/dlt-append-flow-parameterization/m-p/118370#M45633</guid>
      <dc:creator>Dejian</dc:creator>
      <dc:date>2025-05-08T04:00:45Z</dc:date>
    </item>
  </channel>
</rss>

