10-30-2024 06:32 AM - edited 10-30-2024 06:34 AM
I've created a DLT pipeline that creates type 2 SCDs and often the __Start_at and __end_at columns are beyond the first 32 columns for stat collection.
I'd like to add these columns to liquid clustering without increasing the number of columns in the stats, and in a way that is part of the table generation code (aka without having a separate ANALYZE TABLE block, unless this can be done within the DLT code itself)
Code below defines a view to filter the bronze table and uses apply changes into to push into the silver table SCD
.view(
name = bronze_view_name
)
def bronze_table_view():
return spark.readStream.format("delta").table(f"live.{bronze_table_name}").where(silver_transformation_where_clause).selectExpr(silver_transformation_select)
dlt.create_streaming_table(
name = silver_table_name,
comment = silver_table_comment,
cluster_by = cluster_keys_silver #array of columns to include in liquid clustering
)
dlt.apply_changes(
target = silver_table_name,
source = bronze_view_name,
keys = silver_cdc_keys, #array of key columns
sequence_by = silver_cdc_sequence_by, #array of columns
stored_as_scd_type = silver_cdc_scdtype #2
)
The DLT pipeline uses metadata to generate tables, which makes hard coding the solution an issue
10-30-2024 06:52 AM
Hi @Kguy,
To implement liquid clustering on __start_at and __end_at columns in a Delta Live Tables (DLT) pipeline for a type 2 Slowly Changing Dimension (SCD) without manually increasing the number of columns in stats collection configure cluster_by Columns Dynamically:
Define __start_at and __end_at in cluster_by at the table level to improve query performance. This clustering will focus the layout on these columns, enhancing the performance of time-based filtering.
import dlt
from pyspark.sql import functions as F
# Define the bronze view to filter your input data
@dlt.view
def bronze_table_view():
return (
spark.readStream.format("delta")
.table(f"live.{bronze_table_name}")
.where(silver_transformation_where_clause)
.selectExpr(silver_transformation_select)
)
# Create the silver table with liquid clustering on start and end dates
dlt.create_streaming_table(
name=silver_table_name,
comment=silver_table_comment,
cluster_by=["__start_at", "__end_at"], # Dynamic liquid clustering by date columns
)
# Apply type 2 SCD logic with `apply_changes` and specify the clustering
dlt.apply_changes(
target=silver_table_name,
source="bronze_table_view",
keys=silver_cdc_keys, # Primary keys for SCD
sequence_by=silver_cdc_sequence_by, # Sequence column for ordering
stored_as_scd_type="2", # Define as type 2 SCD
)
Using these methods ensures that your __start_at and __end_at columns are effectively clustered, improving query performance without a need to extend stats collection explicitly for these columns.
Let me know if you need further details on these configurations!
Regards!
10-30-2024 07:39 AM
Hi Alfonso,
How does this address the issue of the columns not having stats, as they are beyond the first 32 columns of the table?
10-30-2024 09:07 AM
Good question! In Databricks, Delta Lake automatically collects statistics only for the first 32 columns in a table for performance optimization. When __start_at and __end_at columns are beyond this limit, they are excluded from automatic stats collection.
However, there are ways to improve query performance on these columns without including them in the stats.
An indirect way to ensure that __start_at and __end_at are included in the statistics is to rearrange the table layout so that these columns are within the first 32 positions. This involves recreating the table with the columns reordered so that the most relevant columns appear first.
# Assuming your current table is called `silver_table_name`.
new_schema = [‘__start_at’, ‘__end_at’] + [col for col in df.columns if col not in [‘__start_at’, ‘__end_at’]]]
# Reorder the DataFrame
reordered_df = df.select(new_schema)
# Write the table with the reordered schema
reordered_df.write.mode(‘overwrite’).format(‘delta’).saveAsTable(‘new_silver_table_name’)
⚠️This is a solution that may require additional work for scheme management, especially in a production environment, and may not be feasible if the scheme is handled automatically.
Although we cannot force Delta Lake to include specific columns in the automatic statistics, you can run a manual analysis to include any column you wish.
ANALYZE TABLE your_table_name COMPUTE STATISTICS FOR COLUMNS __start_at, __end_at;
You can read more in this links: https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-aux-analyze-table.html
This command can be used in combination with ZORDER or cluster_by optimisations to improve performance without relying solely on automatic statistics.
My alternatives would be this:
I hope it will be of help to you
Regards!
10-31-2024 01:24 AM
Are these responses generated by chatgpt? They don't answer the question and very much have the tone of generative AI
10-31-2024 02:17 AM
It is not generated with AI, I am simply trying to give you an alternative since as we have previously commented, you cannot collect statistics that exceed more than 32 columns, you already know that this is the only current limitation.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group