cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT Append Flow Parameterization

Dejian
New Contributor II

Hi All,

I'm currently using DLT append flow to merge multiple streaming flows into one output.
While trying to make the append flow into a dynamic function for scalability, the dlt append flow seem to have some errors.

stat_table = f"{catalog}.{bronze_schema}.output"

 
dlt.create_streaming_table(
name = stat_table
)
def append_flow(stat_table, source😞
@dlt.append_flow(target = f"{stat_table}", name = f"{source}_flow")
def topic_flow(topic = source😞
return(
dlt.read_stream(f"{source}")
)

list = ['table1', 'table2', 'table3']
for source in list:
append_flow(stat_table, source)
 
The {source} is another dlt view within the same dlt pipeline.
 
The error message:
Flow 'dejian.test.table3_flow' could not be planned in append mode, but there are multiple flows writing to its destination `dejian`.`bronze`.`__materialization_mat_2d2a9216_c0f8_4ca4_ad69_11fec5c94151_output_1`. Starting in complete mode will cause results to be overwritten. Please edit the flow definition to allow for append mode.
Append mode error (full traces in the driver logs):
[STREAMING_OUTPUT_MODE.UNSUPPORTED_OPERATION] Invalid streaming output mode: append. This output mode is not supported for streaming aggregations without watermark on streaming DataFrames/DataSets. SQLSTATE: 42KDE

I tried using watermarks but I think it does not work as well.
 
I saw example in documentation show looping for different kafka topics as source, does this support non-kafka source as well?

Please advice.
Thank you.
3 REPLIES 3

BigRoux
Databricks Employee
Databricks Employee
The error you're encountering occurs because Delta Live Tables (DLT) append flows currently do not support streaming aggregations or other transformations on streaming DataFrames unless a watermark is applied properly to handle late data. Based on your query and the provided context, let’s carefully address the issues and clarify some points:
 
1. Flow Planning in Append Mode: The error indicates that DLT is unable to compute a plan for append mode because multiple flows are writing to the same target table. However, the target table needs clearly defined appendable data that streaming transformations can process incrementally.
 
2. Watermarking: The error message indicates the necessity to use watermarks effectively to support append mode for cases involving streaming aggregations. Watermarks are essential to manage event-time-based processing in order to drop late events and enable structured streaming to process data incrementally.
 
Points to Address: Dynamic Function in DLT Append Flows - The example in the documentation supports looping through multiple topics for Kafka sources, and similar patterns should also apply to non-Kafka sources. Here's how to dynamically create @Dlt.append_flow for each source, adapting your logic for non-Kafka scenarios.
#### Recommendations: 1. Ensure Watermark is Applied to Source Streams: Add .withWatermark(event_time_column, "time_limit") to your streaming DataFrame of each source to manage late-arrival data. Watermarking is critical for streaming aggregations.
  1. Refactor Code for Dynamic @append_flow Definitions:
    • Move your logic into individual @append_flow-decorated functions for each source, ensuring they return a DataFrame compatible with append mode.
  2. Avoid Multiple Writers to the Same Output Using @append_flow:
    • Ensure each @append_flow writes to distinct portions of data in the target table, or refactor the pipeline to combine streams upstream if possible.
Example Code for Non-Kafka Source Streams: Below is the revised version of your pipeline, incorporating watermarks and avoiding the issue of multiple flows writing to the same target without clear partitioning logic:
 
```python import dlt from pyspark.sql.functions import col
# Define the catalog, schema, and output table catalog = "<catalog>" bronze_schema = "<bronze_schema>" stat_table = f"{catalog}.{bronze_schema}.output"
# Create the target streaming table dlt.create_streaming_table(name=stat_table)
# Function to create dynamic flows with watermarks for each source list = ['table1', 'table2', 'table3'] # Replace this with your actual sources
for source in list: @Dlt.append_flow(target=stat_table, name=f"{source}_flow") def process_source(): return dlt.read_stream(f"{source}").withWatermark("event_time_column", "10 minutes").select("*") `` Here are the key highlights of this approach: - A withWatermark is applied to every source, using an appropriate column (e.g., event_time_column) and a time limit like "10 minutes" to ensure support for append mode. - Each flow has a unique name (e.g., "{source}_flow"`), ensuring that checkpoints are distinct and consistent with the flow definition.
 
Additional Notes: - The provided solution is scalable, as each additional source automatically generates a new @append_flow without conflict. - If your sources don’t have event_time columns, consider generating a pseudo-event time column for them based on ingestion timestamps.
Reference Documentation: - Delta Live Tables append flow documentation mentions the importance of watermarks and dynamic @append_flow processing. - Examples of handling multiple flows to a single target are also described in other contexts.
.
 
Hope this helps, Big Roux.

Dejian
New Contributor II

Hi Big Roux,

Thank you for your explanation and sample code.

However, I did tested with your code but I still having the same error:
[STREAMING_OUTPUT_MODE.UNSUPPORTED_OPERATION] Invalid streaming output mode: append. This output mode is not supported for streaming aggregations without watermark on streaming DataFrames/DataSets. SQLSTATE: 42KDE

I have applied the watermarking when reading the source view table.
Is there any other criteria need to apply to the f"{source}" table so that the append_flow would work?

Thank you.

Dejian
New Contributor II

Hi Big Roux,

Following up on my previous response, I think it might help if I describe the situation more clearly.

The pipeline starts with read_stream from a Delta table and create a view without applying watermarking.
The next stage involves using create_streaming_table and the apply_flow. I'm trying to read from different source and create separate views, but I need to aggregate the result after doing some transformation on these sources.

Delta_tables -> Views -> Streaming_table(append_flow)

In this case, do I need to apply the water marking in the first step when I try to read stream from my actual source(the Delta table)?

I hope this could give you a clearer picture on how my current pipeline works.

Thank you.

Regards,

Dejian

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now