cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Consume updated data from the Materialized view and send it as append to a streaming table

WearBeard
New Contributor

Hello everyone! I'm using DLT and I'm pretty new to them. I'm trying to take the updates from a materialized view and send them to a streaming table as an append.
For example, if I have a MV of 400 records, I want an append to be made to the streaming table and when the view is updated again, an append to the streaming table is made again, always having a total of 400 in the MV and the streaming rate increases, in this case it would reach 800 for the two MV updates.

I'm currently trying to do it and I have this error:

 

 

 

Flow 'streaming_table_test' has FAILED fatally. An error occurred because we detected an update or delete to one or more rows in the source table. Streaming tables may only use append-only streaming sources. If you expect to delete or update rows to the source table in the future, please convert table streaming_table_test to a live table instead of a streaming live table. To resolve this issue, perform a Full Refresh to table streaming_table_test. A Full Refresh will attempt to clear all data from table kafka_events and then load all data from the streaming source. The non-append change can be found at version 5192. 

 

 

 

 

1 REPLY 1

Priyanka_Biswas
Valued Contributor
Valued Contributor

Hi @WearBeard By default, streaming tables require append-only sources. The encountered error is due to an update or delete operation on the 'streaming_table_test'. To fix this issue, perform a Full Refresh on the 'streaming_table_test' table.

You can use the @append_flow decorator to write to a streaming table from multiple streaming sources to do the following:

  • Add and remove streaming sources that append data to an existing streaming table without requiring a full refresh. For example, you might have a table that combines regional data from every region you’re operating in. As new regions are rolled out, you can add the new region data to the table without performing a full refresh.

  • Update a streaming table by appending missing historical data (backfilling). For example, you have an existing streaming table that is written to by an Apache Kafka topic. You also have historical data stored in a table that you need inserted exactly once into the streaming table and you cannot stream the data because you need to perform a complex aggregation before inserting the data.

The following is the syntax for @append_flow:

import dlt

dlt.create_streaming_table("<target-table-name>")

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment") # optional
def <function-name>():
  return (<streaming query>)


If you plan to delete or update rows in the source table in the future, consider converting 'streaming_table_test' to a live table. 
Ref: https://docs.databricks.com/en/delta-live-tables/python-ref.html

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.