cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
irfan_elahi
Databricks Employee
Databricks Employee

Imagine a world where building complex data pipelines is as simple as describing what you want rather than meticulously coding how to do it. This is one of the key propositions that Delta Live Tables (DLT) brings to the table. As a declarative framework, DLT transforms how we approach ETL (Extract, Transform, Load) processes by specifying what needs to be done instead of how it needs to be done. Additionally, It addresses the common headaches of data engineering – complex code, manual optimizations, dependency management, unifying batch and streaming, enhanced auto-scaling, infrastructure management, etc – by offering a more simplified yet efficient method to create reliable data pipelines. While Delta Live Tables (DLT) optimizes and simplifies many aspects, there are still strategic techniques and best practices you can employ to make your pipelines even more efficient. In this blog, we'll dive into the top tips and tricks to help you get the most out of DLT, ensuring your data workflows are as robust and performant as possible.

Beyond the Basics

If you've been using Delta Live Tables (DLT) to build robust data pipelines, chances are you’re already familiar with some of the core best practices.

  • Use Photon: Leverage the vectorized processing engine, written in C++ from the ground up by Databricks with record-breaking performance, to optimize your ETL while maximizing TCO. This documentation link covers the key features and supported operators, expressions, and data types.

  • Use Serverless: Using Serverless not only abstracts the complexity of configuring the resources on which your DLT pipeline will run, e.g., specifying minimum and maximum workers for your nodes; it also significantly reduces the pipeline start-up time from minutes to seconds. Also, all the latest enhancements, e.g., incremental MV refreshes (via the underlying Enzyme engine), and micro-batch pipelining (that optimizes the overall throughput by executing many micro-batches concurrently), to name a few, will be exclusively offered on DLT serverless.

  • Use Enhanced Auto Scaling: When using DLT Classic, i.e., in non-serverless mode, enhanced auto-scaling optimizes the utilization of the compute resources by scaling the cluster up or down as and when required in response to the micro-batches waiting to be processed in the queue while considering the task utilization of the existing clusters in-use. Auto-scaling has been there in Databricks for a long time, but it has been significantly enhanced for DLT, thus ultimately resulting in the best price/performance for your workloads.

With the basics out of the way, let’s look at the next set of top 5 tips to build DLT pipelines optimally.

Tip # 1 - Make the Most of Pipeline Compute Settings

If you already use DLT serverless, you are a rockstar and can skip this tip. In case you are still using DLT classic, which runs on a cluster of VMs, you can make use of the Pipeline compute settings to further optimize pipelines. 

1.1 - Reduce Start-up Time Using Cluster Pools:

One of the biggest advantages Serverless provides is reduced start-up time. Instead of waiting for minutes for your VMs to be launched in your cloud account and be ready to run your DLT pipeline, which may take minutes, Serverless reduces it to seconds. For latency sensitive workloads, this is quite important. However, if you are using DLT classic, you can still reduce cluster start-up time significantly by using cluster pools.

Databricks Cluster Pools optimize cluster startup times for workloads by maintaining a cache of pre-warmed virtual machine instances, allowing clusters to quickly acquire resources without waiting for new instances from the cloud provider. This feature significantly reduces startup time, particularly beneficial for short, automated jobs. Pools manage the lifecycle of these instances efficiently, offering cost savings by only incurring cloud provider costs for idle instances without additional Databricks Unit (DBU) charges. By enabling faster startup and dynamic scaling, Databricks Pools enhance the performance and efficiency of data processing tasks in data pipelines. One call-out is that tags are managed slightly differently in Pools. This documentation link contains more details on that.

If you have already created a pool, you can specify it in the DLT pipeline settings. Although this option is not available in the UI, you can simply toggle from UI to the JSON view of the DLT pipeline settings and specify it there. For instance, here's a JSON view of a DLT pipeline that uses cluster pools for its compute as signified by instance_pool_id key:

{
    "id": "95a97351-efda-4b4d-a0ec-ac9711270e3b",
    "pipeline_type": "WORKSPACE",
    "clusters": [
        {
            "label": "default",
            "instance_pool_id": "0719-040640-noble119-pool-q2q6878f",
            "autoscale": {
                "min_workers": 1,
                "max_workers": 2,
                "mode": "ENHANCED"
            }
        }
    ],
    "development": false,
    "continuous": false,
    "channel": "CURRENT",
    "photon": true,
    "libraries": [
        {
            "notebook": {
                "path": "/path/to/notebook"
            }
        }
    ],
    "name": "dlt_pipeline_name",
    "edition": "ADVANCED",
    "catalog": "catalog",
    "target": "schema",
    "data_sampling": false
}

One key call-out here is that while creating pools, specify None in the "Preloaded Databricks Runtime Version" setting.

Screenshot 2024-08-22 at 2.45.32 PM.png

 Although selecting this option helps speed up cluster launches as the DBR is pre-loaded on idle instances in the pool, DLT uses its own custom DBR, so the option shouldn't be selected.

1.2 - Use Bigger Driver Node If Required

If you are running a lot of flows (i.e., processing streaming tables and materialized views) in your DLT pipeline, the driver node can come under strain. As DLT is built on the battle-tested foundations of structured streaming, the driver node plays a crucial role in streaming workloads such as:

  • Query planning and scheduling
  • Checkpointing
  • Kafka source administration
  • Delta transaction log administration
  • Broadcasting
  • Keeping track of metrics

If you observe in cluster metrics section that the CPU and memory utilization of driver node is exhausting, it could be due to the above mentioned reasons. Thus, in such circumstances, it is recommended to use a bigger driver node. As mentioned in the documentation, you can specify compute settings for the clusters. Specifically, you can specify all the cluster attributes except a few.  A good reference is the create cluster REST API endpoint, which lists the cluster attributes one can use. Using this, one can specify a bigger driver node as per requirement in the clusters part of your pipeline settings JSON:

{
  "clusters": [
    {
      "label": "updates",
      "node_type_id": "i3.xlarge",
      "driver_node_type_id":"i3.2xlarge",
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    },
    {
      "label": "default",
      "spark_conf": {
         "key": "value"
      }
    }
  ]
}

The same concept, i.e. using the compute settings of DLT pipeline, can be used to achieve the following benefits:

  • If your organization has discounts on specific VMs, e.g. if you have reserved instances in AWS, you can try using those VMs in the DLT pipelines by using compute settings.
  • If you want to run init scripts in your cluster, specify those in the compute settings of the DLT pipeline, and the launched cluster will have those executed at start-up time.

Bonus Tip

If you use Production mode in DLT and want to reuse the cluster for subsequent updates, it's not possible by default, as it launches a new cluster for every update. This ensures recycling so that a freshly provisioned cluster is available to run each update. However, it adds an overall delay in execution as for each update, there will be a cluster-startup time (which can be reduced with cluster pools to a greater extent). Thus, if you want to reuse the existing cluster in Production Mode, you can specify a higher value for this config in Pipeline configuration, e.g. 120 minutes:

"pipelines.clusterShutdown.delay": "120m"

By default, for Production Mode, it is 0 seconds, so it launches a new cluster for every update (for development, the default value is 2 hours already).

Tip # 2 - Avoid Persisting Intermediate Data Needlessly

Writing or persisting data intermittently, especially when it is not necessary, is generally discouraged while building ETL pipelines. This can significantly increase storage costs as more space is consumed over time, and it can introduce latency. Additionally, each write operation consumes compute resources, potentially affecting other operations and overall system performance. 

In DLT, you primarily work with three types of objects w.r.t persistence:

  • Persistent tables: These could be either streaming tables or materialized views. If you are using Python, these tables are defined by the decorator (@dlt.table). These are also published in UC.
  • Temporary Tables: When the boolean temporary keyword is set to true, a table is created but not published to UC. It remains available to the pipeline but is not accessible outside the pipeline. To reduce processing time, a temporary table persists for the pipeline's lifetime that creates it, not just a single update. However, the data is still persisted under the hood. These are helpful if you are performing complex transformations and want to reuse them multiple times in the downstream flows.
  • Views: These are similar to "temporary views" in PySpark. They are not published to UC and are not accessible outside the pipeline, and the results do not persist.

Thus, if you have a requirement where you want to specify a transformation logic on tables to be used downstream, and you don't need that dataset to be published to UC,  it is recommended to adhere to using Views instead of the other two options based on the reasons mentioned previously.

Here is an example scenario. Consider that you are using Auto Loader to incrementally process raw files and create bronze tables with minimum to zero transformations, thus closely resembling the source. However, you want to apply certain transformations, e.g., timestamp conversions, type-casting, etc. before merging into a target table using SCD Type 1 or 2 employing APPLY CHANGES INTO API. In this case, you can define a view on the bronze table, specify the necessary logic, and then use that view as a source for APPLY CHANGES INTO. This will prevent persisting the intermediate dataset primarily used for the merge operation.

import dlt
import pyspark.sql.functions as F


@dlt.table(name=f"bronze_calendar")
def load_data():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .option("delimiter", ",")
        .load(f"/Volumes/catalog/schema/volume_name/raw/calendar/")
    )


@dlt.view(name="vw_bronze_calendar")
def silver_calendar():
    return (
        dlt.read_stream("bronze_calendar")
        .withColumn("Date", F.to_date(F.col("Date"), "MM/dd/yy"))
        .withColumn("Year", F.col("Year").cast("int"))
    )


dlt.create_streaming_table(f"silver_calendar")

dlt.apply_changes(
    target=f"silver_calendar",
    source=f"vw_bronze_calendar",
    keys=pk,
    sequence_by=F.col("updated_timestamp"),
    stored_as_scd_type=2,
)

Tip # 3 - Don’t Overlook Key Performance Optimization Techniques

The Databricks platform is constantly undergoing significant simplification, largely driven by AI optimizations powered by DatabricksIQ, which aim to eliminate configuration complexities and reduce maintenance overhead. Delta Live Tables also embodies this approach by abstracting many configurations, allowing data engineers to focus on delivering value. However, there are still scenarios where manual performance tuning is necessary to achieve optimal results.

While defining a DLT table, it is possible to specify a number of properties that, if configured properly, can positively improve DLT pipelines’ performance. For instance:

  • pipelines.autoOptimize.managed (Default: true):  Enables or disables automatically scheduled optimization of this table. This config automatically compacts small files and optimizes data layout for improved query performance. It helps reduce the need for manual OPTIMIZE operations by automatically managing file sizes and data organization as new data is processed. If latency is not a critical concern, it is suggested that it be left to its default True value.
  • pipelines.autoOptimize.zOrderCols (Default: None):  An optional string containing a comma-separated list of column names to z-order this table by. Z-ordering is a technique in Delta Lake that collocates related data in the same set of files, improving query performance by enabling more efficient data skipping. This is particularly helpful for queries against the table involving filter predicates. Going forward, Liquid Clustering will replace Z Order but it is not fully supported in DLT yet. Until it is fully supported, it is recommended to specify columns well suited for Z-order, e.g., high cardinality columns, which are extensively used in filter predicates.
  • pipelines.reset.allowed (Default: true):  Controls whether a full refresh is allowed for this table. If you are ingesting data from message buffers, e.g., Kafka, which typically have low retention periods, it is recommended to set this to False for bronze tables. Usually, bronze tables resemble sources with ETL logic minimally specified. Also, at times one may need to perform a full-refresh of the DLT pipeline, e.g., after altering the ETL logic of downstream layers. However, doing a full-refresh practically drops-recreates the table (along with other things, e.g., clearing checkpoints) and for bronze tables, it could be a concern. With this config specified as False for the bronze table, when you do a full refresh of the DLT pipeline, data in Bronze tables will be retained.

Additionally, the following Delta properties have been observed to be commonly used as well:

  • delta.targetFileSize: The target file size in bytes or higher units for file tuning. For example, 104857600 (bytes) or 100mb. Getting this right can directly impact data-skipping performance.
  • delta.logRetentionDuration: The retention period for a Delta table's history determines how long past records are kept. VACUUM operations can override this period. Delta Lake automatically removes log entries older than the retention period each time a checkpoint is written. Setting a longer retention period results in more log entries being retained, but it generally doesn't affect performance since log operations occur in constant time. However, while processing in parallel, operations involving the table's history may become more costly as the log size grows.
  •  delta.columnMapping.mode (default: None): Column mapping mode allows Delta table columns and underlying Parquet file columns to use different names, enabling schema evolution operations like renaming or dropping columns without rewriting data files

3.1 Automating Table Properties Specification

Managing table properties can become tedious when you need to set them individually for every table. Often, users want a way to apply default properties across all tables at the pipeline level, avoiding repetitive code. Unfortunately, setting global table properties directly through Spark configurations doesn’t work with DLT. To solve this, we can create a custom wrapper around the dlt.table decorator that automatically injects a set of default table properties while still allowing flexibility for specific tables to override them. This approach streamlines the pipeline configuration, making it easier to manage and maintain. Below is an example of how to implement this solution effectively in Python:

import dlt

# Define a custom decorator to add default table properties
def create_dlt_table(*args, **kwargs):
    # Set the default properties to be applied to all tables
    default_properties = {"DEFAULT_KEY1": "DEFAULT_VALUE1"}

    # Extract any table-specific properties provided by the user
    custom_properties = kwargs.pop("table_properties", {})

    # Merge default properties with custom properties, giving precedence to custom ones
    combined_properties = {**default_properties, **custom_properties}

    # Return the dlt.table with the combined properties
    return dlt.table(*args, **kwargs, table_properties=combined_properties)

# Example usage of the custom decorator for a DLT table
@create_dlt_table(name="sample_table", table_properties={"CUSTOM_KEY2": "CUSTOM_VALUE2"})
def define_table_one():
    return dlt.read_stream("cloudFiles")...

# Another example showing how custom properties can override defaults
@create_dlt_table(table_properties={"DEFAULT_KEY1": "OVERRIDE_VALUE", "CUSTOM_KEY2": "CUSTOM_VALUE2"})
def define_table_two():
    return spark.sql("SELECT * from catalog.schema.some_table")

Tip # 4 - Consider using flows and sinks API for your streaming tables

A flow is a streaming query designed to process source data incrementally, updating a target streaming table without needing to reprocess the entire dataset. This incremental approach can be highly efficient, especially when working with large datasets or continuous streams of data. Flows can operate automatically in some cases, for instance, when you create a query that directly updates a streaming table, a flow is often generated behind the scenes without needing explicit configuration.

Understanding the mechanics of flows is crucial for optimizing your data pipelines. Append flows allow data to be added or merged incrementally rather than requiring a full-table refresh every time new data becomes available. This is particularly useful for real-time or near-real-time applications where continuous updates to data are required. By leveraging append flows, you can significantly reduce processing overhead and cost, while also improving the responsiveness of your pipeline.

Scenario 1: When managing data streams, business needs or data requirements often evolve. For example, you may start with a few streaming data sources but find the need to integrate additional streams over time. Instead of creating a brand new streaming table and reprocessing all the data (which can be computationally expensive and time-consuming), you can use an append flow to simply add the new streaming sources to the existing table. The append flow integrates these new sources incrementally, without having to perform a full refresh. This is particularly helpful for large or continuously growing datasets, as it ensures minimal disruption while keeping your pipeline adaptable to changes in data sources.


 @Dlt.append_flow(target = "kafka_target")
 def new_stream_topic():
   return (
     spark.readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", "host1:port1,...")
       .option("subscribe", "new_stream_topic")
       .load()
   )

Scenario 2: In traditional pipelines, combining multiple datasets often requires performing a UNION operation, which consolidates rows from different queries into a single result set. While effective, UNION operations can be computationally expensive, especially if you’re working with large datasets or performing frequent updates. Instead of relying on costly UNIONS, append flows allow you to achieve the same result by incrementally merging new data into the target table as it becomes available. This process reduces the need for full-refresh updates, meaning that only the new data is processed and appended. The result is a much more efficient pipeline that saves on both time and computational resources, while still maintaining up-to-date results in the target table.

 @Dlt.append_flow(target = "csv_target")
 def backfill():
   return spark.readStream
     .format("cloudFiles")
     .option("cloudFiles.format","csv")
     .load("path/to/backfill/data/dir")

In both scenarios, append flows improve the scalability and efficiency of data pipelines by minimizing the need for full-table updates. This can lead to significant cost savings, especially when dealing with cloud resources or large-scale, real-time data processing systems.

Tip # 5 - Choose the appropriate file detection mode

Delta Live Tables allow us to use Auto Loader for most data ingestion tasks from cloud object storage. Auto Loader and Delta Live Tables work together to incrementally and idempotently ingest and process data as it arrives. 

To keep track of new files, Auto Loader employs several different methods to monitor the input directory for new events . By default, this is done through Directory Listing. The other approach is to use the File notification mode.

Directory Listing Mode operates by periodically scanning the input directory to detect new files for ingestion. It relies solely on listing files without the need for additional cloud services or event-based notifications, which makes it easy to set up and configure. One of the main benefits of directory listing is the ability to control ingestion frequency. You can schedule scans based on your data processing needs, which is useful if you want to limit the volume of data being ingested at once. Additionally, directory listing is efficient when dealing with large files, as fewer files mean less frequent scans and lower processing overhead. However, there are some drawbacks to this method, such as latency—since it scans at intervals, there can be delays between file arrival and ingestion. This method also lacks scalability, as large numbers of files can lead to longer scanning times and higher compute costs. Directory listing is best suited for scenarios involving large files that don’t require real-time processing or when you need to control ingestion timing. Also note that your cloud provider may charge you for LIST operations on your cloud storage, making this an expensive option for low latency, high volume use cases.

# Define the source path
source_path = "path/to/your/directory"


# Set up the Auto Loader stream
df = (spark.readStream
    .format("cloudFiles")
    # Define your format
    .option("cloudFiles.format", "binaryFile")
    # false = Directory listing mode, true = File notification mode
    # by default Directory listing mode is used
    .option("cloudFiles.useNotifications", "false") 
    .load(source_path))

File Notification Mode, on the other hand, offers a more scalable and performant solution for real-time, high-volume data ingestion. Instead of scanning the directory, this mode uses event-driven notifications from cloud storage to detect new files as soon as they arrive, making it ideal for real-time ingestion where low latency is essential. This method is especially efficient for large volumes of files, as it avoids the overhead of scanning directories and instead relies on notifications for immediate processing. File notification integrates with cloud services like AWS SQS, Azure Event Grid, and Google Pub/Sub, but it requires a more complex setup and monitoring of resource limits, such as the number of queues or notifications allowed by your cloud provider. 

Note that autoloader automatically sets up a notification and queue service that subscribes to file events, for single user compute only. For use with DLT, manual setup is needed. You can then specify the configuration as part of your pipeline:

# Define the source path
source_path = "path/to/your/directory"


val df = spark.readStream.format("cloudFiles")
   # Define your format
   .option("cloudFiles.format", "binaryFile")
    # false = Directory listing mode, true = File notification mode
   .option("cloudFiles.useNotifications", "true")
   # queue details
   .option("cloudFiles.region", "myRegion")
   .option("cloudFiles.queueUrl", "https://sqs.myRegion.amazonaws.com/...")
   .load(source_path)

Refer to the summary table below to determine which mode is better suited for your use case:

 

Directory Listing

File Notification Mode

Ingestion Type

Periodic, scheduled ingestion

Event-driven, real-time ingestion

Setup Complexity

Simple, no additional services required

Complex, requires configuring cloud-based notifications

Latency

Higher latency due to periodic scanning

Low latency, immediate file detection

Efficiency with Small Files

Less efficient, scans entire directory

Highly efficient, processes only new files

Efficiency with Large Files

More efficient, suited for larger files

Can handle large files, but more effective with small files

Scalability

Limited scalability as the number of files increases

High scalability for large directories and frequent file events


Conclusion

Delta Live Tables (DLT) offers a simplified approach in building efficient data pipelines through a declarative framework, minimizing the need for complex coding and manual optimizations. Key tips for optimizing DLT pipelines include using Databricks’ Photon engine for enhanced processing, leveraging serverless architecture for reduced startup times, and taking advantage of auto-scaling features. Other strategies involve optimizing pipeline compute settings by utilizing cluster pools and bigger driver nodes, avoiding unnecessary data persistence, and fine-tuning performance with specific table properties. Additionally, using flows and append operations for efficient data streaming and choosing the right file detection mode—whether directory listing or file notification—can significantly improve the scalability and cost-efficiency of data pipelines. These best practices ensure robust, high-performing ETL processes with DLT.