Hey @JakeerDE
The error message "Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark" indicates that the DISTINCT operation in your query, which acts as a streaming aggregation, requires a watermark to be defined if you're using the append output mode in apply changes into. Watermarks help track the progress of data streams and determine which data is considered "old" enough to be deleted.
Potential Solutions:
- Add a watermark to your streaming DataFrame using WITH WATERMARK AS clause before the DISTINCToperation. This will track the event time of your data and enable append mode. Example:
APPLY CHANGES INTO LIVE.targettable
FROM ( SELECT DISTINCT * FROM STREAM(sourcetable)
WITH WATERMARK AS (createddate) -- Assuming 'createddate' is your event time column )
KEYS (col1, col2) IGNORE NULL UPDATES SEQUENCE BY (createddate) STORED AS SCD TYPE 2;
- Windowing with dropDuplicates: Use dropDuplicates within a window over col1 and col2 to keep the latest record within each group:
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY col1, col2 ORDER BY createddate DESC) AS row_num FROM my_table ) AS t WHERE row_num = 1;
- Custom Deduplication Logic: Implement your own deduplication logic using conditional filters or user-defined functions based on specific requirements.
SELECT * FROM my_table WHERE NOT EXISTS ( SELECT 1 FROM my_table AS t2 WHERE t2.col1 = my_table.col1 AND t2.col2 = my_table.col2 AND t2.created_at > my_table.created_at );
- Drop Duplicates using pyspark:
df_deduplicated_multi = df.dropDuplicates(["id", "date"])
df_deduplicated_multi.show()
- If append mode isn't essential, explore other output modes supported with streaming aggregations (e.g., complete or update). However, consider the implications on your downstream data consumption or table structure.
Leave a like if this helps! Kudos,
Palash