02-20-2024 04:18 AM - edited 02-20-2024 04:19 AM
Hi @Kaniz_Fatma,
We have a kafka source appending the data into bronze table and a subsequent DLT apply changes into to do the SCD handling. Finally, we have materialized views to create dims/facts.
We are facing issues, when we perform deduplication inside apply changes into like below.
Can someone help me understand what exactly I am missing here and how to resolve this?
We are using databricks SQL code.
APPLY CHANGES INTO LIVE.targettable
FROM
(
SELECT DISTINCT * FROM STREAM(sourcetable)
)
KEYS (col1, col2)
IGNORE NULL UPDATES
SEQUENCE BY (createddate)
STORED AS
SCD TYPE 2;
Error
org.apache.spark.sql.catalyst.ExtendedAnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;
02-23-2024 01:39 AM
I have resolved this use case by adding TRACK HISTORY ON to the APPLY CHANGES INTO and removed the DISTINCT keyword.
APPLY CHANGES INTO LIVE.targettable
FROM ( SELECT *
FROM STREAM(sourcetable_1) tbl1
INNER JOIN STREAM(sourcetable_2) tbl2 ON tbl1.id = tbl2.id
WITH WATERMARK AS (tbl1.createddate))
KEYS (col1, col2) IGNORE NULL UPDATES SEQUENCE BY (createddate) STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (col1,col2);
Hope this helps to someone.
Thanks.
02-20-2024 08:38 PM
Hey @JakeerDE
The error message "Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark" indicates that the DISTINCT operation in your query, which acts as a streaming aggregation, requires a watermark to be defined if you're using the append output mode in apply changes into. Watermarks help track the progress of data streams and determine which data is considered "old" enough to be deleted.
Potential Solutions:
APPLY CHANGES INTO LIVE.targettable
FROM ( SELECT DISTINCT * FROM STREAM(sourcetable)
WITH WATERMARK AS (createddate) -- Assuming 'createddate' is your event time column )
KEYS (col1, col2) IGNORE NULL UPDATES SEQUENCE BY (createddate) STORED AS SCD TYPE 2;
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY col1, col2 ORDER BY createddate DESC) AS row_num FROM my_table ) AS t WHERE row_num = 1;
SELECT * FROM my_table WHERE NOT EXISTS ( SELECT 1 FROM my_table AS t2 WHERE t2.col1 = my_table.col1 AND t2.col2 = my_table.col2 AND t2.created_at > my_table.created_at );
df_deduplicated_multi = df.dropDuplicates(["id", "date"])
df_deduplicated_multi.show()
02-20-2024 10:43 PM - edited 02-20-2024 10:46 PM
Hi @Palash01
Thanks for the response. Below is what I am trying to do. However, it is throwing an error.
APPLY CHANGES INTO LIVE.targettable
FROM ( SELECT DISTINCT *
FROM STREAM(sourcetable_1) tbl1
INNER JOIN STREAM(sourcetable_2) tbl2 ON tbl1.id = tbl2.id
WITH WATERMARK AS (tbl1.createddate))
KEYS (col1, col2) IGNORE NULL UPDATES SEQUENCE BY (createddate) STORED AS SCD TYPE 2;
org.apache.spark.sql.catalyst.parser.ParseException: [PARSE_SYNTAX_ERROR] Syntax error at or near 'WITH': missing ')'.
Is my query correct or am I missing something.
02-23-2024 01:39 AM
I have resolved this use case by adding TRACK HISTORY ON to the APPLY CHANGES INTO and removed the DISTINCT keyword.
APPLY CHANGES INTO LIVE.targettable
FROM ( SELECT *
FROM STREAM(sourcetable_1) tbl1
INNER JOIN STREAM(sourcetable_2) tbl2 ON tbl1.id = tbl2.id
WITH WATERMARK AS (tbl1.createddate))
KEYS (col1, col2) IGNORE NULL UPDATES SEQUENCE BY (createddate) STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (col1,col2);
Hope this helps to someone.
Thanks.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group