โ07-12-2024 04:47 AM
Hi Team,
We have one huge streaming table from which we want to create another streaming table in which we will pick few columns from the original streaming table. But in this new table the rows must be unique.
Can someone please help me with the implementation of this scenario if possible and if not why and is there any alternatives to this?
Thanks.
โ07-17-2024 01:21 AM - edited โ07-17-2024 01:26 AM
Hi @MyTrh ,
Ok, I think I created similiar use case to yours. I have streaming table with column structure based on your example
CREATE OR REFRESH STREAMING TABLE clicks_raw AS SELECT *, current_timestamp() as load_time FROM cloud_files('/Volumes/dev/demo_db/landing_zone/clicks_data', "csv", map("cloudFiles.inferColumnTypes", "true"))
Now, I have another streaming table which is consuming the first one. Here I perform grouping to achieve distinct combination of user_id and department_id and I'm aggregating count for each group.
So as you can see from sceen below, take for example user_id = user_001 and departament_id = dept_01.
We can see that in clicks_raw there are two rows for this combination:
- one with 20 clicks
- another one with 15 clicks
So the final result in our target table for this user and this department should be 35.
CREATE OR REFRESH STREAMING TABLE clicks_aggregated AS SELECT user_id, department_id, SUM(clicks) as clicks FROM STREAM(LIVE.clicks_raw) GROUP BY user_id, department_id
As you can see from screen below I achieved incremental refresh.
โ07-14-2024 01:08 AM
You can do this using Delta Live Tables, For design you can follow medallion architecture (https://www.databricks.com/glossary/medallion-architecture).
You can have your 1st table as bronze with everything just appended and second table as silver with defined keys and selected columns.
You can take a look here for how to implement this - https://docs.databricks.com/en/delta-live-tables/tutorial-pipelines.html
โ07-14-2024 01:49 AM - edited โ07-14-2024 01:52 AM
Hi @MyTrh ,
I think what you can try is to read from the first streaming table and then use foreatchbatch when writing to the target streaming table. Inside foreatchbatch you should have defined merge logic and that should do the trick.
Edit: I didn't notice that you've mentioned dlt. Ignore this post then, because I can't delete it ๐
โ07-15-2024 01:28 AM
Thanks for the responses
It seems that I was not able to explain the problem statement properly so here it is:
I have one streaming table in my dlt pipeline say table1 with schema (user_id, department_id, time_stamp, clicks).
From this streaming table I want to make another table(streaming table or materialized view) table2 with schema (user_id, department_id, clicks) and in this new table user_id and department_id must be unique and clicks should be updated everytime new data comes in table1.
So far I am able to make a table2 using materialized view but it is doing complete refresh(flow type: complete) each time to calculate table2. What I want is to make incremental update(flow type: incremental) on my table2 so that it doesn't need to read the whole table1 and then make table2 based on that.
Again, I have seen answers on incremental refresh but none of them is in dlt pipeline notebook, so I am not sure whether this is even supported in dlt pipeline or not. It would be great is someone can provide the sample code along with explanation
Thanks in advace
โ07-15-2024 07:59 AM - edited โ07-15-2024 08:00 AM
Hi @MyTrh ,
Thanks for clarification. I'll try tomorrow recreate your use case and test it. But your approach seems to be correct. As you, I would create a materialized view that would consume changes from streaming table.
Could you check if your query that you use to define materialized view uses only below keywords? They are required for incremental refresh, so if you are using some keywords not listed there, then that could be a reason why you have a complete refresh.
โ07-16-2024 09:30 AM
Thanks @szymon_dybczak for looking into my query.
Yes, I am using allowed keywords only to define the materialized view.
It will be really helpful it you can provide the notebook solution for this use case.
โ07-17-2024 01:21 AM - edited โ07-17-2024 01:26 AM
Hi @MyTrh ,
Ok, I think I created similiar use case to yours. I have streaming table with column structure based on your example
CREATE OR REFRESH STREAMING TABLE clicks_raw AS SELECT *, current_timestamp() as load_time FROM cloud_files('/Volumes/dev/demo_db/landing_zone/clicks_data', "csv", map("cloudFiles.inferColumnTypes", "true"))
Now, I have another streaming table which is consuming the first one. Here I perform grouping to achieve distinct combination of user_id and department_id and I'm aggregating count for each group.
So as you can see from sceen below, take for example user_id = user_001 and departament_id = dept_01.
We can see that in clicks_raw there are two rows for this combination:
- one with 20 clicks
- another one with 15 clicks
So the final result in our target table for this user and this department should be 35.
CREATE OR REFRESH STREAMING TABLE clicks_aggregated AS SELECT user_id, department_id, SUM(clicks) as clicks FROM STREAM(LIVE.clicks_raw) GROUP BY user_id, department_id
As you can see from screen below I achieved incremental refresh.
โ07-18-2024 10:59 PM
Thank you very much @szymon_dybczak this worked for me.
But I wonder then what is the difference between streaming table and materialized view as we can incrementally "UPDATE" the streaming table also?
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group