Short version: I need a way to take only the most recent record from a variable number of tables in a stream. This is a relatively easy problem in sql or python pandas (group by and take the newest) but in a stream I keep hitting blocks. I could do it outside a stream or merge the full history and create a view of only the most recent, but I anticipate querying these result tables very regularly and having to recalculate them every day or every query will get wasteful fast. And frankly, at that point I’m just building a traditional database which somewhat defeats the purpose of databricks.
Long Version: Many of our data feeds come as separate event entities, which I need to merge into a single “current” table. This means I’m trying to pull the most recent record, by ID, out of the source tables (variable number, can be more than two) into a single final table, all using streams and in a generalizable way for reuse.
First, if I try to merge via the SQL temp view approach discussed in many of the Databricks Academy videos (which allows me to create a somewhat SQL-like solution) I end up with this: I can’t use Append mode however as the purpose of this join is to update with new records when relevant (e.g. if an event is an update, the record in the output table should be replaced). I could, potentially, use complete mode instead, but that would recreate the entire destination table each time I believe, meaning I lose all my streaming efficiency.
I also tried an approach where I did a stream-stream join in pyspark directly from multiple source tables and joined them with the static result of a subquery. This got quite complex as I had to join the static dataframe of the subquery with multiple streaming dataframes, work out all the watermarks, and join those dataframes with one another. In the end I still hit the above error.
I have another approach too which breaks things out into multiple notebooks splitting insertion events (append) from update events (upsert) but I’m hitting syntax errors on these also, and this approach requires a chain of tasks that can get complex and slows down the speed of results. It may be that this is the only workable approach, but debugging those individual errors, particularly when upserting something that should replace only if the timestamp is newer, is proving complex as well.
In the midst of this, it struck me I should just ask if there’s a recommended solution here.