cancel
Showing results for 
Search instead for 
Did you mean: 
Community Discussions
cancel
Showing results for 
Search instead for 
Did you mean: 

inegstion time clustering

mderela
New Contributor II

Hello, in rerence to https://www.databricks.com/blog/2022/11/18/introducing-ingestion-time-clustering-dbr-112.html

I have a silly question how to use it. So let's assume that I have a few TB of not partitioned data. So, if I would like to query on data that has been ingested starting from yesterday, what I should do?

select * from mytable where WHAT_SHOULD_BE_HERE >= current_timestamp() - INTERVAL 1 day

 In other words - what I need to query on to make sure that only small part of "files" will be "scaned" instead of whole dataset. It is clear for me how to achive that with using partitions but how with ingestion time clustering?

 

 

1 ACCEPTED SOLUTION

Accepted Solutions

Kaniz
Community Manager
Community Manager

Hi @mderelayou can add the pipelines—Auto-optimize—zOrderCols parameter to optimize data skipping with Z-order indexes. Z-ordering is a technique to colocate related information in the same set of files, which is automatically used by data-skipping algorithms to reduce the amount of data that needs to be read. To Z-order data, specify the columns to order on in the ORDER BY clause.

For example, to colocate by gender, run:

sql OPTIMIZE people_10m ZORDER BY (gender) 

. You can specify multiple columns for ZORDER BY as a comma-separated list, but the effectiveness of the locality drops with each extra column. Z-ordering on columns that do not have statistics collected on them would be ineffective and a waste of resources. Data skipping requires column-local stats such as min, max, and count. You can configure statistics collection on specific columns by reordering columns in the schema or increasing the number of columns to collect statistics on.

Sources: Docs: data-skippingDocs: tutorial

View solution in original post

6 REPLIES 6

Kaniz
Community Manager
Community Manager

Hi @mderela , 

If you have ingested data using ingestion time clustering, you can use the ingesttimestamp column to filter data based on when it was ingested. Your query would look like this:

SELECT * FROM mytable WHERE ingesttimestamp >= current_timestamp() - INTERVAL 1 day

This will only scan the data that was ingested in the past day. Remember that this will only work if you have set up ingestion time clustering for your table. If not, you must partition your data or scan the entire dataset.

mderela
New Contributor II

thank you @Kaniz 

Could you please put a little bit more light on configuration ? So, for instance - I am performing ingestion with using DLT. Should I add extra parameters (like pipelines.autoOptimize.zOrderCols) or it should be done in other way?

Kaniz
Community Manager
Community Manager

Hi @mderelayou can add the pipelines—Auto-optimize—zOrderCols parameter to optimize data skipping with Z-order indexes. Z-ordering is a technique to colocate related information in the same set of files, which is automatically used by data-skipping algorithms to reduce the amount of data that needs to be read. To Z-order data, specify the columns to order on in the ORDER BY clause.

For example, to colocate by gender, run:

sql OPTIMIZE people_10m ZORDER BY (gender) 

. You can specify multiple columns for ZORDER BY as a comma-separated list, but the effectiveness of the locality drops with each extra column. Z-ordering on columns that do not have statistics collected on them would be ineffective and a waste of resources. Data skipping requires column-local stats such as min, max, and count. You can configure statistics collection on specific columns by reordering columns in the schema or increasing the number of columns to collect statistics on.

Sources: Docs: data-skippingDocs: tutorial

JKR
New Contributor III

Hi @Kaniz  refereing this "Remember that this will only work if you have set up ingestion time clustering for your table".

Can you please elobrate how can we setup "ingestion time clustering" for existing non-partitioned tables?

Kaniz
Community Manager
Community Manager

Hi @JKR , 

Enable Liquid Clustering:
  - Enable liquid clustering when creating a table
  - Add CLUSTER BY phrase to the table creation statement

 Choose Clustering Keys:
  - Select clustering keys based on commonly used query filters
  - If two columns are correlated, only add one as a clustering key

Change Clustering Keys:

  - Use ALTER TABLE command to change clustering keys for a table

•Trigger Clustering:
  - Use Databricks 13.2 or above
  - Use OPTIMIZE command on the table to trigger clustering

• Maintain Ingestion Time Clustering:
  - Use OPTIMIZE with ZORDER BY When performing a large number of modifications using UPDATE or MERGE statements
  - Use a column that matches the ingestion order (e.g., event timestamp or creation date). Note: Databricks recommends liquid clustering for new Delta tables, especially for tables filtered by high cardinality columns, skewed data distribution, fast-growing tables, tables with concurrent write requirements, changing access patterns, and tables with partition keys that result in too many or too few partitions.

Additional Resources
 

 

JKR
New Contributor III

@Kaniz Thank for sharing this, it is really helpful, but my question remains the same that how can we know Ingestion Time Clustering is enabled?  As per doc it is enabled by default with DBR 11.2 & above.

- Does Ingestion Time Clustering and Liquid clustering are similar? 

-  What about the existing non-partitioned tables? Can I enable liquid clustering on those if I upgrade my interactive clusters to use Databricks 13.2 or above?

My scenario is I have some delta non-partitioned tables around 200 to 300 GB of data in each table. and ETL requirement is to get max timestamp, so what I do is select max(timestamp) from table every 5 minutes on those tables separately in different jobs and then further utilize thse max_timestamp in their ETL pipelines.

max_timestamp query is taking around more than 2.5 minutes to fetch the max_timestamp from those tables. Upon check the Spark UI and DAG I found out this query is reading all the files behind the table and not pruning any file that is why it's taking that much time only to fetch max(timestamp).

What should I do to get that max(timestamp) in lesser time (less than 10 secs) without partitioning the table as it recommended by Databricks to only partition tables if we have table size greater than 1 TB.

Thanks