cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
grpeternel
Databricks Employee
Databricks Employee

Reliability, consistency, reusability, and performance are the four core pillars of a resilient and robust Data Architecture. Delta Lake is the technical backbone to ensuring data is readily available to support and solve the world’s toughest problems. Delta Lake brings reliable ACID transactions, consistent batch and streaming flexibility, reusable Lakehouse data across Data & AI use cases, and performant optimizations for faster data retrieval and consumption. With great power comes great responsibility to ensure these features are properly integrated. Databricks provides both manual and automatic delivery of these Delta Lake techniques.

Predictive Optimization (PO) best practices

The Databricks Intelligence Platform leverages Unity Catalog (UC) and Databricks AI & Machine Learning to intelligently predict the best data optimization model to perform on your data. Databricks recommends adopting the PO best practice for UC managed tables to automatically tune Delta Lake tables to simplify data maintenance and reduce storage costs. Refer to the official Databricks documentation for more details here.

This blog aims to be useful for customers’ cloud regions who:

  1. Do not have access to PO yet
  2. Primarily use external tables
  3. Prefer the flexibility to manually fine tune their Lakehouse per unique use cases
  4. Serve as additional learning and education material for Databricks’ users

Delta Lake features

Delta Lake provides many data optimization techniques to effectively manage the underlying data files in Delta Lake’s storage layer. In this blog we will build a blueprint of reusable modularized functions that combine these optimization methods to iterate through bronze, silver, and gold Delta Lake tables at scale. The objective is to boost query performance while efficiently maintaining big data and cost management at the Lakehouse level. The core fundamental optimization operations to be discussed are:

  • Liquid Clustering
  • Target File Size
  • Optimize
  • Vacuum
  • Analyze

Let’s briefly summarize each technique prior to combining them into reusable functions.

Liquid Clustering

Liquid Clustering (LQ) eliminates the need to perform traditional file folder partitioning and ZORDER operations. It is recommended to select key fields that are commonly used in query filters. LQ’s CLUSTER BY columns can be changed on an adaptive ad-hoc basis too without having to re-write the data (i.e. rebuild the Delta Lake table) and re-structure the folder structure like PARTITION BY. This provides more flexibility and compute cost savings. Currently LQ allows clustering data by up to 4 keys.

The SQL syntax is as follows: ALTER TABLE <table_name> CLUSTER BY (<clustering_cols>);

To remove keys the SQL syntax is as follows: ALTER TABLE <table_name> CLUSTER BY NONE;

Starting in Databricks Runtime 16.0 and above there is now support to perform LQ using Structured Streaming .writeStream() via .clusterBy(<clustering_cols>) for Spark Python (PySpark) and Spark Scala.

Refer to the official Databricks documentation full API params & more details here.

Target File Size

Target File Size (TFS) is a Delta Lake table property (delta.targetFileSize) that provides the flexibility to specify the desired size of the data files in the root Delta Lake table directory. It ensures Delta Lake tables are written to storage with the specified, approximate, file size. Typically, each layer of the Medallion Architecture will have different TFS to suit specific needs based on a data design pattern showcased here.

The property can either be set via Spark configuration or Delta Lake ALTER TABLE SQL statement:

  1. spark.conf.set("spark.databricks.delta.properties.defaults.targetFileSize", "<tfs_value>")
  2. ALTER TABLE <table_name> SET TBLPROPERTIES (delta.targetFileSize = '<tfs_value>');

Refer to the official Databricks documentation full API params & more details here.

Optimize

OPTIMIZE command triggers Spark jobs to re-compile and process existing data files. For example, running the optimize command will execute LQ (if set) and cluster the Delta Lake table by clustering key(s) as well as apply all table properties (if set, i.e. target file size) accordingly. The output will be new compacted and clustered data files in the root Delta Lake table directory with an objective to improve read and write performance.

The SQL syntax is as follows: OPTIMIZE <table_name> [WHERE predicate];

Spark Python & Spark Scala provide support too via the Delta Lake API.

Refer to the official Databricks documentation for full API params & more details here.

Vacuum

VACUUM command triggers Spark jobs to delete and remove excess stale data files in the root Delta Lake table directory that are no longer needed for queries. For example, any old data files from previous Delta Lake versions that remain after running OPTIMIZE command. The default vacuuming threshold is based on a retention period of 7 days meaning anything older than 7 days will be wiped clean. This can be adjusted accordingly however please be careful with this setting because once the data files are eliminated, they cannot be recovered! Fun fact: All directories that start with an underscore (i.e. _delta_log) are skipped during vacuuming. Therefore, if you’re storing streaming checkpoints in the root of the Delta Lake table directory be sure to name them starting with an underscore (i.e. _checkpoints) to prevent metadata from being erased.

The SQL syntax is as follows: VACUUM <table_name> [RETAIN num HOURS];

Please be sure to review the retention threshold and tailor accordingly.

Spark Python & Spark Scala provide support too via the Delta Lake API.

Refer to the official Databricks documentation full API params & more details here.

Analyze

ANALYZE TABLE command computes estimated statistics on Delta Lake table columns. By default, the first 32 columns in the schema. The statistics are used to compile a better query plan for the query optimizer.

The column statistics can be adjusted via Delta Lake table properties (delta.dataSkippingNumIndexedCols and delta.dataSkippingStatsColumns). See full details here. The property can either be set via Spark configuration or Delta Lake ALTER TABLE SQL statement:

  1. spark.conf.set("spark.databricks.delta.properties.defaults.dataSkippingNumIndexedCols", "<value>")
  2. ALTER TABLE <table_name> SET TBLPROPERTIES (delta.dataSkippingStatsColumns = '<comma_sep_cols>');

ANALYZE TABLE can take some time to complete therefore it is recommended to include this command as separate operation post optimization jobs to reduce slow down of execution time and SLA.

The SQL syntax is as follows: ANALYZE TABLE <table_name> COMPUTE DELTA STATISTICS [FOR ALL COLUMNS];

Refer to the official Databricks documentation full API params & more details here.

Development

Now we will walk through the code to build the combined optimizations. The Databricks notebooks contain five Databricks Widgets as reusable parameters: Catalog Level Name, Database Level Name, Liquid Clustering Column(s), Target File Size, and Medallion Layer.

%python
dbutils.widgets.text("cat_level_name", "enter_catalog_level_name", "Catalog Level Name")
dbutils.widgets.text("db_level_name", "enter_database_level_name", "Database Level Name")
dbutils.widgets.text("lq_cols", "enter_liquid_clustering_columns", "Liquid Clustering Column(s)")
dbutils.widgets.text("tfs_value", "enter_target_file_size_value", "Target File Size Value")
dbutils.widgets.text("medal_layer", "enter_medallion_layer", "Medallion Layer")

Here are the variables from the Widgets that will be passed into the reusable functions.

%python
cat_level_name = dbutils.widgets.get("cat_level_name"); print(f"catalog level name: {cat_level_name}")
db_level_name = dbutils.widgets.get("db_level_name"); print(f"database level name: {db_level_name}")
lq_cols = dbutils.widgets.get("lq_cols"); print(f"liquid clustering column(s): {lq_cols}")
tfs_value = dbutils.widgets.get("tfs_value"); print(f"target file size value: {tfs_value}")
medal_layer = dbutils.widgets.get("medal_layer"); print(f"medallion layer: {medal_layer}")

The first function queries and collects the Delta Lake tables from their respective UC catalog and database via the Spark SQL SHOW TABLES command. A filter pulls only the set medallion layer from the Widget. A python list (or scala sequence) is returned containing all the bronze, silver, or gold Delta Lake tables.

%python
def query_collect_exec(cat_level_name: str, db_level_name: str, medal_layer: str) -> list:
   """
   Retrieve a list of Delta Lake tables from the specified catalog and database that match the given medal layer pattern.

   Args:
   cat_level_name (str): The catalog level name.
   db_level_name (str): The database level name.
   medal_layer (str): The layer to match table names against.

   Returns:
   list: A list of Delta Lake table names that match the medal layer.
   """
   obj_level_name = f"{cat_level_name}.{db_level_name}"
   tables = (spark
             .sql(f"SHOW TABLES IN {cat_level_name}.{db_level_name}")
             .filter("isTemporary = false")
             .filter(f"tableName LIKE '%{medal_layer}%'")
             .select("tableName")
             .collect())
   return [row.tableName for row in tables]

The second function performs the core combined optimization techniques per Delta Lake table. It runs a Delta Lake DESCRIBE DETAIL command to pull all non-empty objects with delta format. If Delta Lake files exist the four core optimization techniques are executed in sequence.

%python
def combine_tune_exec(cat_level_name: str, db_level_name: str, tbl_level_name: str, lq_cols: str, tfs_value: str) -> None:
   """
   Tune a Delta Lake table by liquid clustering, target file size, optimizing, and vacuuming.

   Args:
       cat_level_name (str): The catalog level name.
       db_level_name (str): The database level name.
       tbl_level_name (str): The table level name.
       lq_cols (str): The column(s) to cluster by.
       tfs_value (str): The target file size value.

   Returns:
       None
   """
   obj_level_name = f"{cat_level_name}.{db_level_name}.{tbl_level_name}"
   delta_lake_files = (spark.sql(f"DESCRIBE DETAIL {obj_level_name}").filter("format = 'delta'").count() > 0)

   if delta_lake_files:
       print(f"Liquid clustering delta lake table: {obj_level_name} by {lq_cols}")
       spark.sql(f"ALTER TABLE {obj_level_name} CLUSTER BY ({lq_cols})")

       print(f"Setting target file size delta lake table: {obj_level_name} at {tfs_value}")
       spark.sql(f"ALTER TABLE {obj_level_name} SET TBLPROPERTIES ('delta.targetFileSize' = '{tfs_value}')")

       print(f"Optimizing delta lake table: {obj_level_name}")
       spark.sql(f"OPTIMIZE {obj_level_name}")

       print(f"Vacuuming delta lake table: {obj_level_name}")
       spark.sql(f"VACUUM {obj_level_name}")
   else:
       print(f"Skipping non-Delta Lake table: {obj_level_name}")

The third function calls the second function and applies the execution via iterating over the python list (or scala sequence) of Delta Lake tables. With elastic distributed Databricks Compute you will be tuning multiple Delta Lake tables at scale. To boost speed and performance consider using Photon for larger volume datasets.

%python
def pass_iterate_exec(delta_lake_tables: list, cat_level_name: str, db_level_name: str, lq_cols: str, tfs_value: str) -> None:
   """
   Main function to tuning multiple Delta Lake tables; calls the `combine_tune_exec` function for each table in the list.

   Args:
       delta_lake_tables (list): A list of Delta Lake table names to tune.
       cat_level_name (str): The catalog level name.
       db_level_name (str): The database level name.
       lq_cols (str): The column(s) to cluster by.
       tfs_value (str): The target file size value.

   Returns:
       None
   """
   for tbl_level_name in delta_lake_tables:
       combine_tune_exec(cat_level_name, db_level_name, tbl_level_name, lq_cols, tfs_value)

Here is an example of how to execute the functions.

%python
delta_lake_tables = query_collect_exec(cat_level_name, db_level_name, medal_layer)
pass_iterate_exec(delta_lake_tables, cat_level_name, db_level_name, lq_cols, tfs_value)

Deployment

A deployment best practice would be to schedule the reusable functions as notebook tasks via Databricks Workflows as an automated batch job customized to your use case. For example, as a blueprint, a single job with three tasks: bronze, silver, and gold to optimize each layer of the medallion architecture accordingly. A good starting point would be cloning the workflow per catalog and database levels. Running daily before or after business hours ensures all Delta Lake tables are readily available and regularly optimized on demand for users to query.

Example Databricks Workflow Tasks

Picture1.png

Each task will set the key:value parameters tailored to the medallion layer. For bronze, a target file size of 64mb is a solid starting point because bronze data typically is append only stream and serves as a system of record therefore stellar query performance is usually not required nor accessed by end users. For silver, a target file size of 200mb is a reliable starting point because silver data is curated and consumed by data scientists and analysts performing exploratory data analysis and machine learning. For gold, a target file size of 1gb is a compact starting point because gold data is further sliced and aggregated for dashboards and KPI reports that drive business decisions and provide data intelligence to stakeholders.

Example Databricks Workflow Parameters

Picture2.png

Conclusion

This material hopes to help simplify Lakehouse data management as well as serve as a starter kit to efficiently and cost effectively build a Delta Lake maintenance automation schedule that ensures all Delta Lake tables are readily available to power your Data+AI users and business solutions. It is recommended for customers using UC managed tables to leverage PO however Databricks provides flexibility of choice between manual and automatic tuning. Please do not hesitate to adjust these functions accordingly to your use cases, Medallion Architecture naming conventions, and additional layers if applicable.

Thank you for reading this blog. DBC file notebooks (Spark Python & Spark Scala) with code can be found @ GitHub repo here.