cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks SQL - Deduplication in DLT APPLY CHANGES INTO

JakeerDE
New Contributor III

Hi @Kaniz_Fatma,

We have a kafka source appending the data into bronze table and a subsequent DLT apply changes into to do the SCD handling. Finally, we have materialized views to create dims/facts.

We are facing issues, when we perform deduplication inside apply changes into like below. 

Can someone help me understand what exactly I am missing here and how to resolve this?

We are using databricks SQL code. 

APPLY CHANGES INTO LIVE.targettable
FROM
(
SELECT DISTINCT * FROM STREAM(sourcetable)
)
KEYS (col1, col2)
IGNORE NULL UPDATES
SEQUENCE BY (createddate)
STORED AS
SCD TYPE 2;

Error

org.apache.spark.sql.catalyst.ExtendedAnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;

1 ACCEPTED SOLUTION

Accepted Solutions

JakeerDE
New Contributor III

I have resolved this use case by adding TRACK HISTORY ON to the APPLY CHANGES INTO and removed the DISTINCT keyword. 

APPLY CHANGES INTO LIVE.targettable
FROM ( SELECT * 
FROM STREAM(sourcetable_1) tbl1
INNER JOIN STREAM(sourcetable_2) tbl2 ON tbl1.id = tbl2.id
WITH WATERMARK AS (tbl1.createddate))
KEYS (col1, col2) IGNORE NULL UPDATES SEQUENCE BY (createddate) STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (col1,col2);

 Hope this helps to someone.

Thanks.

View solution in original post

3 REPLIES 3

Palash01
Contributor III

Hey @JakeerDE 

The error message "Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark" indicates that the DISTINCT operation in your query, which acts as a streaming aggregation, requires a watermark to be defined if you're using the append output mode in apply changes into. Watermarks help track the progress of data streams and determine which data is considered "old" enough to be deleted.

Potential Solutions: 

  • Add a watermark to your streaming DataFrame using WITH WATERMARK AS clause before the DISTINCToperation. This will track the event time of your data and enable append mode. Example:
APPLY CHANGES INTO LIVE.targettable
FROM ( SELECT DISTINCT * FROM STREAM(sourcetable)
WITH WATERMARK AS (createddate) -- Assuming 'createddate' is your event time column )
KEYS (col1, col2) IGNORE NULL UPDATES SEQUENCE BY (createddate) STORED AS SCD TYPE 2;
  • Windowing with dropDuplicates: Use dropDuplicates within a window over col1 and col2 to keep the latest record within each group:
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY col1, col2 ORDER BY createddate DESC) AS row_num FROM my_table ) AS t WHERE row_num = 1;
  • Custom Deduplication Logic: Implement your own deduplication logic using conditional filters or user-defined functions based on specific requirements.
SELECT * FROM my_table WHERE NOT EXISTS ( SELECT 1 FROM my_table AS t2 WHERE t2.col1 = my_table.col1 AND t2.col2 = my_table.col2 AND t2.created_at > my_table.created_at );​
  • Drop Duplicates using pyspark:
df_deduplicated_multi = df.dropDuplicates(["id", "date"])
df_deduplicated_multi.show()​
  • If append mode isn't essential, explore other output modes supported with streaming aggregations (e.g., complete or update). However, consider the implications on your downstream data consumption or table structure.
Leave a like if this helps! Kudos,
Palash

JakeerDE
New Contributor III

Hi @Palash01 

Thanks for the response. Below is what I am trying to do. However, it is throwing an error.

 

APPLY CHANGES INTO LIVE.targettable
FROM ( SELECT DISTINCT * 
FROM STREAM(sourcetable_1) tbl1
INNER JOIN STREAM(sourcetable_2) tbl2 ON tbl1.id = tbl2.id
WITH WATERMARK AS (tbl1.createddate))
KEYS (col1, col2) IGNORE NULL UPDATES SEQUENCE BY (createddate) STORED AS SCD TYPE 2;

 

org.apache.spark.sql.catalyst.parser.ParseException: [PARSE_SYNTAX_ERROR] Syntax error at or near 'WITH': missing ')'.

Is my query correct or am I missing something.

JakeerDE
New Contributor III

I have resolved this use case by adding TRACK HISTORY ON to the APPLY CHANGES INTO and removed the DISTINCT keyword. 

APPLY CHANGES INTO LIVE.targettable
FROM ( SELECT * 
FROM STREAM(sourcetable_1) tbl1
INNER JOIN STREAM(sourcetable_2) tbl2 ON tbl1.id = tbl2.id
WITH WATERMARK AS (tbl1.createddate))
KEYS (col1, col2) IGNORE NULL UPDATES SEQUENCE BY (createddate) STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (col1,col2);

 Hope this helps to someone.

Thanks.

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!