cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
val_data_eng
Databricks Employee
Databricks Employee

Delta Live Tables is a highly popular tool for simplifying the creation of reliable and maintainable data pipelines among our customers. It is an ETL declarative framework that allows creating Materialized Views and Streaming Tables in a reliable and maintainable way. However, a limitation is that users are unable to fully utilize Delta Sharing due to the lack of support for Materialized Views and Streaming Tables. To address this, we are currently developing a workaround that will enable users to continue benefiting from Delta Live Tables while also being able to share their data with third-party tools.

Synchronization between DLT and Delta tables:

In this post, we will discuss how to share data from a DLT pipeline, covering the two different types of tables you can have in DLT:

  • Materialized Views
  • Streaming Tables

In our examples, we will refer to the tables from the DLT loan demo pipeline. In particular: 

  • raw_txs is a streaming table
  • total_loan_balances is a materialized view

The DLT plan for the Loan demo pipelineThe DLT plan for the Loan demo pipeline

 

Materialized Views

A Materialized View in Databricks is a data object that stores the precomputed results of a query as a physical table. The main benefit is the reduction of the frequency and complexity of the query result calculation, resulting in:

  • faster query performance 
  • reduced workloads
  • saving computational resources.

Materialized Views (MV) are automatically and incrementally updated using DLT pipelines. However, MVs cannot be updated incrementally using Structured Streaming.

To Delta Share the content of a MV  we have 3 options:

  1. Use “CREATE TABLE AS” (CTAS) statement from the MV to copy-create a table in delta format.
  2. Remove the MV from DLT and re-create the aggregation logic in SQL/spark and save the output as a Delta table  
  3. Use a MERGE-UPSERT statement to update a target delta table with the content of the MV

We can set this process out of DLT as a task in Workflows using a Notebook task.
Note that all of the above approaches would not work in a continuous stream scenario, instead the table update is expected to be executed at regular intervals. 

 

Option 1 - CREATE TABLE AS

Using the previously discussed Materialized View called 'total_loan_balances', we can efficiently create our table as follows:

CREATE OR REPLACE TABLE my_catalog.dlt_loans.total_loan_balances_delta
USING DELTA
TBLPROPERTIES (
 'delta.autoOptimize.optimizeWrite' = 'true',
 'delta.autoOptimize.autoCompact' = 'true',
 'delta.enableChangeDataFeed' = 'true',
 'delta.tuneFileSizesForRewrites' = 'true'
)
AS
SELECT * FROM my_catalog.dlt_loans.total_loan_balances

The table is completely re-created for each execution. Note that during the table creation process, its data could not be available for a short period of time.

Option 2 - Re-create the aggregation as a standard Delta table

We can create the table with the following pyspark code:

(
 spark.sql("""SELECT sum(revol_bal) AS bal, addr_state AS location_code FROM my_catalog.dlt_loans.historical_txs GROUP BY addr_state
 UNION SELECT sum(balance) AS bal, country_code AS location_code FROM my_catalog.dlt_loans.cleaned_new_txs GROUP BY country_code""")
 .write
 .mode('overwrite')
 .saveAsTable('my_catalog.dlt_loans.total_loan_balances_delta')
)

For each execution, the table is fully overwritten with an atomic operation, and not re-created as in option 1. The advantage with this approach is that we can keep its history and the data is always available.

Option 3 - Use a MERGE-UPSERT statement

With the following code we can merge the data from the MV into a Delta table.

MERGE INTO my_catalog.dlt_loans.total_loan_balances_delta as balances_delta
USING my_catalog.dlt_loans.total_loan_balances as balances_mv
ON balances_delta.country_code = balances_mv.country_code
WHEN MATCHED THEN
 UPDATE SET
   bal = balances_mv.bal,
   location_code = balances_mv.location_code
WHEN NOT MATCHED
 THEN INSERT (
   bal,
   location_code)
 VALUES (
   balances_mv.bal,
   balances_mv.location_code
 )

With this approach we also ensure that the table always exists as it is updated by delta atomic operation.
The Downside is that this approach would not be efficient for large tables. In this scenario, the data layout could significantly impact both write and read performance, especially as the table size grows. Refer to Liquid clustering documentation for more details.
In the next paragraph we are going to discuss performance considerations.

 

Additional Performance considerations

As the Delta table does not longer leverage on the automated optimisations of DLT, we can add some properties to our Delta table to give it some Materialized Views properties: 

val_data_eng_1-1729608087039.png

Because DLT automatically manages your physical data to minimize costs and optimize performance, it would be convenient to enable Predictive Optimization for the shared schema. However, this feature is not available with Delta Sharing. 

Instead, users can schedule a job to run OPTIMIZE and VACUUM additionally.

Streaming Tables

A Streaming Table (ST) is a type of DLT table that simplifies the creation of streaming or incremental workloads. STs allow you to process a growing dataset efficiently. They typically process data from an append-only data source.

If we want to Delta Share a DLT Streaming Table, we can create a copy as a standard Delta table. We can do so efficiently by leveraging structured streaming. 

We can use our DLT Streaming Table as the source of a simple Spark Structured Streaming job that replicates the data.

The following code snippet is a Spark Structured Streaming job that writes into a non-DLT Delta Table.

raw_df = spark.readStream \
   .table("my_catalog.dlt_target.raw_txs") # our Streaming Table

raw_df.writeStream \
       .trigger(availableNow=True) \
       .option("checkpointLocation", checkpoint_path) \
       .toTable("my_catalog.dlt_target.raw_txs_ss_copy") # a standard Delta table

In the example above, we use the availableNow trigger, which is the option for incremental batch jobs.

However, this process could also run as a continuous streaming job. For details about the triggers and processing modes in Structured Streaming, please refer to https://docs.databricks.com/en/structured-streaming/triggers.html and https://docs.databricks.com/en/structured-streaming/index.html.

As in the Materialized Views example, DLT Streaming tables come out of the box with optimizations and automated configurations. Those configurations need to be manually added in Structured Streaming and include:

  • Configuration of the checkpoint location
  • File storage layout configurations for file size organization, optimize and compaction

Please also note that Structured Streaming requires a separate computing cluster from the DLT pipeline. We will discuss this in the next paragraph when we will see how to combine the DLT and Structured Streaming jobs in a single workflow.

The data copied via Structured Streaming can now be consumed via Delta Sharing by the client.

The table can be read statically by a downstream process that requires access to the latest snapshot of the data. More interestingly, it can be consumed incrementally or as a continuous stream using Apache Spark™ Structured Streaming over Delta Sharing.

Assuming your client still uses Spark, this approach represents a simple and efficient alternative to streaming data across different systems/regions using specialized solutions such as Kafka.

The main advantages of this more straightforward approach are:

  • no additional infrastructure is required 
  • no de/serialization is required 
  • There is no need for a schema registry 
  • it achieves reasonable latency

val_data_eng_2-1729608086946.png

(see  “How to Use Delta Sharing for Streaming Data” for details)

 

The high-level process for streaming data over Delta sharing can be described as follows:

  1. The client authenticates
  2. The client requests for a table version  
  3. The provider server checks the permissions
  4. The Delta Sharing Server generates pre-signed, short-lived URLs
  5. The Client pulls the parquet files for the requested version

val_data_eng_3-1729608087082.png
[High-level architecture of Stream on a Delta Share - source: “How to Use Delta Sharing for Streaming Data”]

The following code snippet is an example of how to pull the data on the Client/Consumer workspace: 

stream = (
   spark
   .readStream
   .format("deltaSharing")
   .option("skipChangeCommits", True)
   .table("my_delta_share_name.dlt_target.raw_txs_ss_copy")
)

Caveats:

  • The Above code assumes a Databricks-to-Databricks share within the same account.
    If consuming via Open sharing, you are also going to need a credential file and to modify the code accordingly (see https://docs.databricks.com/en/delta-sharing/read-data-open.html😞
    .load(f"<credentials-path>#<share-name>.<schema-name>.<table-name>")
  • If the Data Provider regularly runs a VACUUM process, which clears old versions of the shared table, they must ensure that all consumers have read the relevant files before purging them. They can achieve this by checking the system audit tables for deltaSharingQueriedTable events (more details: https://docs.databricks.com/en/delta-sharing/audit-logs.html
  • If we execute the stream in continuous mode, by default, the Delta Sharing server throttles the stream capacity to prevent overloading. We can control this behavior with spark.delta.sharing.streaming.queryTableVersionIntervalSeconds to achieve better latency potentially
  • Reading a deltaSharing source in structured streaming requires a cluster with a shared access mode.

More details on Streaming and Delta sharing, focusing on low latency requirements, are well described  in this Data+AI 2024 talk: “How to Use Delta Sharing for Streaming Data.”

 

Combining the DLT and Structured Streaming processes in a single Job

To ensure that the DLT and non-DLT processes are executed synchronously, we can combine them in 1 single Workflow Job, which triggers the Structured Streaming and Materialized view copy processes 

For example, we can create a Workflow Job with the following tasks:

  1. A Pipeline task triggering the DLT pipeline
  2. A notebook task running the notebook executing the CTAS statements to copy the shared materialized views
  3. A notebook task running the notebook running the Producer structured streaming copy of the shared streaming tables

The final pipeline results in the following graph in the Workflow UI:

val_data_eng_4-1729608087016.png

 

Conclusion

While Databricks continuously develops its ecosystem, directly integrating Delta Sharing D2O with Delta Live Tables (DLT) is still a work in progress. However, this should not hinder your data sharing and pipeline development efforts.

You can effectively bridge the current functionality gap by following the techniques outlined in this blog – specifically, granting DLT table properties and leveraging Structured Streaming. These methods enable you to maintain the integrity and real-time capabilities of your DLT tables while facilitating seamless data sharing.

As we await future updates, these approaches serve as robust interim solutions. They not only address immediate challenges but also align with best practices for scalable and efficient data architecture. Keep an eye on Databricks' release notes for upcoming features, and in the meantime, utilize these strategies to optimize your data pipelines and sharing capabilities.

1 Comment
Lakshay
Databricks Employee
Databricks Employee

Very interesting and useful!