Hi
I have a structured streaming job that reads from a delta table "A" and pushes to another delta table "B".
A Schema - group_key, id, timestamp, value
B Schema - group_key, watermark_timestamp, derived_value
One requirement is that i need to get the max watermark_timestamp from "B" for each group (group_key) and then join that with "A" to filter only all those messages for each group than are > each group's watermark_timestamp. After processing those data and updating state, I need to get the max timestamp from those messages and append in B's watermark_timestamp field for each group. Apart from this, i will push some additional data as well in derived_value column to use downstream.
Basically the above ensures that already processed data does not again come into the stream.
Problem is I am reading from same table as I am writing. When I execute this my job is not succeeding at all when I put B as sink. When I change B to a different table say C then it proceeds.
I tried everything. I tried collect B max group data before the stream even starts. Still not working,
Whats the solution for this? Could someone please help.
Additionally in general if i have a requirement where I need to buffer data for days, I dont want to store everything in memory, apply watermark in arbitary stateful processing and then filter. Whats the best way to solve this problem. I was thinking of using SQL queries which is what my above does.