cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

How to take only the most recent record from a variable number of tables in a stream

tom_shaffner
New Contributor III

Short version: I need a way to take only the most recent record from a variable number of tables in a stream. This is a relatively easy problem in sql or python pandas (group by and take the newest) but in a stream I keep hitting blocks. I could do it outside a stream or merge the full history and create a view of only the most recent, but I anticipate querying these result tables very regularly and having to recalculate them every day or every query will get wasteful fast. And frankly, at that point I’m just building a traditional database which somewhat defeats the purpose of databricks.

Long Version: Many of our data feeds come as separate event entities, which I need to merge into a single “current” table. This means I’m trying to pull the most recent record, by ID, out of the source tables (variable number, can be more than two) into a single final table, all using streams and in a generalizable way for reuse.

First, if I try to merge via the SQL temp view approach discussed in many of the Databricks Academy videos (which allows me to create a somewhat SQL-like solution) I end up with this: temp" data-fileid="0698Y00000JF9NlQALI can’t use Append mode however as the purpose of this join is to update with new records when relevant (e.g. if an event is an update, the record in the output table should be replaced). I could, potentially, use complete mode instead, but that would recreate the entire destination table each time I believe, meaning I lose all my streaming efficiency.

I also tried an approach where I did a stream-stream join in pyspark directly from multiple source tables and joined them with the static result of a subquery. This got quite complex as I had to join the static dataframe of the subquery with multiple streaming dataframes, work out all the watermarks, and join those dataframes with one another. In the end I still hit the above error.

I have another approach too which breaks things out into multiple notebooks splitting insertion events (append) from update events (upsert) but I’m hitting syntax errors on these also, and this approach requires a chain of tasks that can get complex and slows down the speed of results. It may be that this is the only workable approach, but debugging those individual errors, particularly when upserting something that should replace only if the timestamp is newer, is proving complex as well.

In the midst of this, it struck me I should just ask if there’s a recommended solution here.

3 REPLIES 3

Håkon_Åmdal
New Contributor III

Did you try storing it all to a DELTA table with a MERGE INTO [1]? You can optionally specify a condition on "WHEN MATCHED" such that you only insert if the timestamp is newer.

[1] https://docs.databricks.com/spark/latest/spark-sql/language-manual/delta-merge-into.html

Yes, "Append" mode is doing a Upsert/Merge in pyspark, which is the same thing as the SQL Merge Into I believe; that's what gave me the above error.

And I had another version that did a Merge-Into by individual data source, but I spoke to Databricks and they recommended against doing that as I'd need to have sequential merges from each source to avoid the writes conflicting, and an error in one would result in a failure across them all.

@Håkon Åmdal​ to be clear, I think what you're suggesting is in fact the only approach that might work. i.e. an upsert for each data source, sequentially.

This being the only way seems to be a spark limitation; as the answer at apache spark - Structured Streaming extract most recent values for each id - Stack Overflow discusses. There are multiple discussed solutions for a joint-approach online but none seemed to work for the answerer there, nor do they for me, meaning it needs to be done sequentially instead of jointly.

I may still switch to that approach, but before I do I'd like to understand better why the databricks engineer I talked to recommended against it and what the alternatives are. The one the engineer suggested seemed to suggest going to a more noSQL-like style of tables and processing updates via aggregate functions on that result; I'm still working through understanding that.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.