cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Delta table with unique columns incremental refresh

MyTrh
New Contributor III

Hi Team,

We have one huge streaming table from which we want to create another streaming table in which we will pick few columns from the original streaming table. But in this new table the rows must be unique.

Can someone please help me with the implementation of this scenario if possible and if not why and is there any alternatives to this?

Thanks.

1 ACCEPTED SOLUTION

Accepted Solutions

szymon_dybczak
Esteemed Contributor III

Hi @MyTrh ,

Ok, I think I created similiar use case to yours. I have streaming table with column structure based on your example

image.png

CREATE OR REFRESH STREAMING TABLE clicks_raw AS SELECT *, current_timestamp() as load_time FROM cloud_files('/Volumes/dev/demo_db/landing_zone/clicks_data', "csv", map("cloudFiles.inferColumnTypes", "true"))

 

Now, I have another streaming table which is consuming the first one. Here I perform grouping to achieve distinct combination of user_id and department_id and I'm aggregating count for each group.

So as you can see from sceen below, take for example user_id = user_001 and departament_id = dept_01.

We can see that in clicks_raw there are two rows for this combination:

 

        - one with 20 clicks

        - another one with 15 clicks

 

So the final result in our target table for this user and this department should be 35. 

 

image (1).png

 

CREATE OR REFRESH STREAMING TABLE clicks_aggregated AS SELECT user_id, department_id, SUM(clicks) as clicks FROM STREAM(LIVE.clicks_raw) GROUP BY user_id, department_id

 As you can see from screen below I achieved incremental refresh.

image (2).png

 

 

 

 

 

 

 

View solution in original post

7 REPLIES 7

p4pratikjain
Contributor

You can do this using Delta Live Tables, For design you can follow medallion architecture (https://www.databricks.com/glossary/medallion-architecture).
You can have your 1st table as bronze with everything just appended and second table as silver with defined keys and selected columns.
You can take a look here for how to implement this - https://docs.databricks.com/en/delta-live-tables/tutorial-pipelines.html

Pratik Jain

szymon_dybczak
Esteemed Contributor III

Hi @MyTrh , 

I think what you can try is to read from the first streaming table and then use foreatchbatch when writing to the target streaming table. Inside foreatchbatch you should have defined merge logic and that should do the trick.

Edit: I didn't notice that you've mentioned dlt. Ignore this post then, because I can't delete it ๐Ÿ˜‰

MyTrh
New Contributor III

Thanks for the responses

It seems that I was not able to explain the problem statement properly so here it is:

I have one streaming table in my dlt pipeline say table1 with schema (user_id, department_id, time_stamp, clicks).

From this streaming table I want to make another table(streaming table or materialized view) table2 with schema (user_id, department_id, clicks) and in this new table user_id and department_id must be unique and clicks should be updated everytime new data comes in table1. 

So far I am able to make a table2 using materialized view but it is doing complete refresh(flow type: complete) each time to calculate table2. What I want is to make incremental update(flow type: incremental) on my table2 so that it doesn't need to read the whole table1 and then make table2 based on that.

Again, I have seen answers on incremental refresh but none of them is in dlt pipeline notebook, so I am not sure whether this is even supported in dlt pipeline or not. It would be great is someone can provide the sample code along with explanation

Thanks in advace

szymon_dybczak
Esteemed Contributor III

 

 

Hi @MyTrh ,

Thanks for clarification. I'll try tomorrow recreate your use case and test it. But your approach seems to be correct. As you, I would create a materialized view that would consume changes from streaming table.

Could you check if your query that you use to define materialized view uses only below keywords? They are required for incremental refresh, so if you are using some keywords not listed there, then that could be a reason why you have a complete refresh.

https://learn.microsoft.com/en-us/azure/databricks/optimizations/incremental-refresh#support-for-mat...

MyTrh
New Contributor III

Thanks @szymon_dybczak for looking into my query.

Yes, I am using allowed keywords only to define the materialized view.

It will be really helpful it you can provide the notebook solution for this use case.

 

szymon_dybczak
Esteemed Contributor III

Hi @MyTrh ,

Ok, I think I created similiar use case to yours. I have streaming table with column structure based on your example

image.png

CREATE OR REFRESH STREAMING TABLE clicks_raw AS SELECT *, current_timestamp() as load_time FROM cloud_files('/Volumes/dev/demo_db/landing_zone/clicks_data', "csv", map("cloudFiles.inferColumnTypes", "true"))

 

Now, I have another streaming table which is consuming the first one. Here I perform grouping to achieve distinct combination of user_id and department_id and I'm aggregating count for each group.

So as you can see from sceen below, take for example user_id = user_001 and departament_id = dept_01.

We can see that in clicks_raw there are two rows for this combination:

 

        - one with 20 clicks

        - another one with 15 clicks

 

So the final result in our target table for this user and this department should be 35. 

 

image (1).png

 

CREATE OR REFRESH STREAMING TABLE clicks_aggregated AS SELECT user_id, department_id, SUM(clicks) as clicks FROM STREAM(LIVE.clicks_raw) GROUP BY user_id, department_id

 As you can see from screen below I achieved incremental refresh.

image (2).png

 

 

 

 

 

 

 

MyTrh
New Contributor III

Thank you very much @szymon_dybczak  this worked for me.

But I wonder then what is the difference between streaming table and materialized view as we can incrementally "UPDATE" the streaming table also?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group