- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
In this blog we are going beyond the basics to explore the internals of Databricks Merge into.
By the end of this article, you will learn:
- How the merge command works.
- How running MERGE using Photon + Liquid clustering the target table can supercharge your merge statements with ~5x improvement.
- How to access merge performance metrics and use them to tune the merge statement.
- How to using broadcasting with merge to achieve ~7x improvement.
- How to rewrite certain merge statements to achieve ~2x improvement.
This blog assumes some familiarity with the below concepts:
- Liquid Clustering
- Deletion Vectors
- Dynamic File pruning
- Types of joins (Broadcast-Hash and Shuffle-Hash)
How merge works
Lets start with the merge query below, lets call this query1.
merge into
target_data as tgt
using
source_data1 as src
on tgt.user_id = src.user_id
when matched then update set *
when not matched then insert *
The terminology used here is target (target_data) for the table we are merging into, and source(source_data1) for the table that has the change feed/data that needs to be merged into the target. In a nut shell we can break down the execution of merge into 2 major phases: Scan and Rewrite.
Scan phase
In the scan phase of MERGE, both source and target are scanned and joined together to find matches. The join can be broadcast or shuffle-hash, depending on the sizes of the source and target, as well as the type of join (inner, left-anti, and others). Lets refer to the number of files from target with matching data from both the tables as K. So the outcome of this phase is to get the list of K files, in preparation for the next Rewrite phase.
Rewrite phase
In this phase only those K files are scanned from target and everything from source, joined to apply updates for the matching(M) rows and inserts for the new(N) or not matched rows. While M + N rows get written as new Delta files, there will be K DeletionVectors or Delta file rewrites(or both) in the target table. During this phase, since the scanning of K files from target and source tables is happening for the second time Disk Caching comes into effect. During this phase the tables are scanned from the SSDs of the compute nodes instead of cloud storage, so table scans are much faster.
Rewriting a Delta file vs Deletion vectors
If the portion of data modified in a Delta file from the target table reaches a level where read amplification(using Deletion vectors) exceeds the rewrite amplification, the Delta file will be rewritten. For example, if more than 50% of a Delta file’s rows have a deletion vector associated with them, then it probably makes sense to rewrite the Delta file rather than creating a new deletion vector entry. On the flip side, if less than 1% of a Delta file needs to be rewritten, then it probably makes more sense to create a deletion vector rather than rewriting the Delta file.
How LC + DFP + Photon improve Merge Performance
Before we dive into this topic let's do a quick summary on input tables, the code required for creating input tables is at the bottom of this blog. target_data has 5B rows stored in 500 files, source_data1 has 8k rows in 5 files, user_id is the unique key in both the tables. target_data doesn’t have liquid clustering enabled. Lets use the same merge query above(query1) for the 3 scenarios below. All the merge queries covered in this blog are run on either Classic/Serverless warehouses, this helps to skim through pruning metrics easily.
No Optimizations
This scenario covers running query1 on Classic warehouses or non-Photon enabled Compute clusters. Although classic warehouses support certain Photon operations, they don’t fully Photonize Merge. Also for this scenario the tables don’t have liquid clustering enabled.
Scan phase
Spark reads/scans all the 500 files from target, 5 files from source, and determines that all 500 files from target have matching rows from source. This list of files names/paths will be input to the next phase, the rewrite.
Rewrite phase
During this phase, 500 files from target, 5 files from source will be scanned, 4001 rows that are matching from target and source get updated, rest of the rows from source are inserted. Since all the 500 files are impacted due to merge, 500 DeletionVectors are created.
Metrics
Query profiler metrics show that it took (wall clock time) 92 sec to run this query and 1010 files read. 1010 = 2 * 500 (target) + 2 * 5 (source). Delta history metrics show # of rows modified (inserted+updated), # of deletion vectors created.
Liquid Clustering
This scenario covers running query1 on Classic warehouses or non-Photon enabled Compute clusters. But the difference here is that the target table is liquid clustered by the user_id column. Below is the command for liquid clustering:
%sql
alter table target_data cluster by (user_id);
optimize target_data;
The above command changes the underlying file layout. Key metrics: numFilesAdded: 430, numFilesRemoved: 500.
Scan phase
Spark reads/scans all the 430 files from target, 5 files from source, and determines that only 8 out of 430 files from target have matching rows from source. In simple terms, the same 4001 rows matching from both the target and source were found in just 8 files! This list of files names/paths will be input to the next phase.
Rewrite phase
During this phase, 8 files from target, 5 files from source are scanned, 4001 rows that are matching from target and source get updated, rest of the rows from source are inserted. Since only 8 files are impacted due to merge, 8 Deletion vectors are created.
Metrics
Query profiler metrics show that it took(wall clock time) 74 sec to run this query and 448 files read, 422 files pruned. 448 = 430 (target during scan phase) + 8(target during rewrite phase) + 2 * 5 (source). Delta history metrics show # of rows modified(inserted+updated), # of deletion vectors created.
Liquid Clustering + Photon
This scenario covers running query1 on Serverless warehouses or Photon enabled Compute clusters or Serverless. DFP kicks in automatically for Merge when using the Photon engine. The target table is liquid clustered by the user_id column, and has 430 files.
Scan phase
Unlike Spark, Photon doesn’t scan the entire target table and perform the join. Photon first checks if it can broadcast the source table. In our case, it did broadcast, which means that our query likely attempted to dynamically prune files.
Dynamic File Pruning (DFP)
As part of DFP, all the distinct keys(user_id) from the source table are collected and broadcasted, in this case there are 8k unique user_ids from the source table. The target table (which is already liquid clustered by user_id) would then be filtered/queried for only those keys; the operator is called SubqueryBroadcast, seen below, which leads to much better pruning. On the flip side, if the target table had not been liquid clustered, DFP would still execute SubqueryBroadcast, but the pruning may not be as efficient. In this scenario, 104M rows are read from source data, 4.89B rows were skipped.
So during this phase, DFP helps to read only 8 files from the target, 5 files from source, and gets the list of files names/paths with matching rows, this will be input to the next phase.
Rewrite phase
During this phase, 8 files from target, 5 files from source are scanned, 4001 rows that are matching from target and source get updated, rest of the rows from source are inserted. Since only 8 files are impacted due to merge, 8 Deletion vectors are created.
Metrics
Query profiler metrics show that it took(wall clock time) 17 sec to run this query and 26 files read, 844 files pruned. 26 = 8 (target during scan phase) + 8(target during rewrite phase) + 2 * 5 (source). We can see how the merge time got reduced from 74 sec to 17 sec. This is why we always recommend using Liquid clustering + Photon for merge. Delta history metrics show # of rows modified(inserted+updated), # of deletion vectors created.
We could achieve 92 sec to 17 sec, 5x improvement using Photon + Liquid clustering target table.
Gotchas with DFP
- DFP kicks in only when the join performed during the scan phase can be a broadcast join. Otherwise DFP won’t kick in, a full target table scan is performed. While AQE does its best to determine if the table needs to be broadcasted, you can force the broadcast using hints. We will cover in depth about this topic in the next section.
- If the number of unique keys in the source table is greater than 10k, min & max of this list is used for SubqueryBroadcast, rather than forming a sub query with every element in the list. This can sometimes make the pruning less efficient, meaning more files might be scanned from the target, but it should still be more efficient than scanning everything from the target.
Merge metrics from Delta table history
In this section we will see how the Spark UI side of things look like, and how to get the same metrics from Delta history.
Querying Delta history: just showing the metrics related to scan and rewrite phase, recommend you to go through all the metrics.
- scanTimeMs: Total duration of the scan phase during merge.
- rewriteTimeMs: Total duration of the rewrite phase during merge.
- executionTimeMs: Total duration of the merge
Querying Delta table history:
%sql
with history_tab as (
describe history target_data
)
select
operationmetrics.scanTimeMs,
operationmetrics.rewriteTimeMs,
operationmetrics.executionTimeMs
from history_tab where history_tab.operation = 'MERGE'
order by version
The entries from Spark UI that have scanning files for matches, show the tasks for scan phase. The entries that have rewriting N files, show the tasks for the rewrite phase.
No Optimizations
SparkUI
We can see that all 500 files from target were rewritten, but they are not fully rewritten/overridden, it means that 500 deletion vectors are created.
Delta history - merge metrics
Scan phase took ~57s vs total merge time ~82s.
Liquid Clustering
SparkUI
Notice that only 8 files from target were rewritten.
Delta history - merge metrics
Scan phase took ~44s vs total merge time ~63s. Note that scanTimeMs is slightly lower than previous run (57s), because its 430 vs 500 files are read during the scan phase. rewriteTimeMs is 5s vs 9s, this is due to the fact that only 8 files were scanned/rewritten from the target table vs 500 from the previous scenario.
Liquid Clustering + Photon
SparkUI
On Photon runtimes this view is compacted.
Delta history - merge metrics
The scan phase duration was drastically cut, taking 7 seconds versus our previous 57 seconds and 44 seconds. This is due to how complimentary DFP and clustering are.. The rewrite phase took 4s, which is slightly faster than the previous scenario because it is operating on the same amount of data, and is using Photon for the speed boost.
Gotchas with rewriteTimeMs
In all the scenarios above, we observed that rewriteTimeMs is much lower than scanTimeMs. While this may not always be the case, in this instance, the rewrite phase doesn’t involve rewriting the entire Delta file; instead, only deletion vectors are generated. If the portion of data modified in a Delta file reaches a level where read amplification exceeds rewrite amplification, the Delta file will be rewritten. In such scenarios you will see rewriteTimeMs higher.
Why is Merge metrics from Delta table history important?
Having to tune the Merge for performance is very common. When you observe that your merge command is taking longer than expected, you can simply query the merge metrics from Delta history to identity one of these:
- How has each part of the merge been performing over time? Has its performance degraded over time? Is its performance cyclical or predictable?
- What fraction of the merge time is going for the scan vs rewrite phase?
- If scanTimeMs is taking longer, check to see if pruning has happened and if we can make pruning better by clustering on meaningful merge keys.
- If the rewriteTimeMs is taking longer, we can check what's going with Deletion vectors. We can compare numTargetDeletionVectorsAdded vs numTargetRowsUpdated vs numTargetFilesAdded to see if the slowness is because of Delta files being overwritten.
For example if you run the above query for Delta history, you can see below.
It's easy to identify the merge trend, which part of the merge is taking longer etc..
Note: If your source table used in the merge is a view (either created from a Dataframe or a query), it can add additional time as the source dataset needs to be materialized first. materializeSourceTimeMs represents the duration of materialization of the source dataset in merge. If your source dataset is a Delta table, this duration can be negligible(milli sec).
How to Broadcast in Merge query
For this section, both the merge queries are run on a medium serverless warehouse, this means you expect LC + DFP + Photon, and we don’t emphasize on the rewrite phase of merge, as we are trying to optimize the scan phase.
Quick summary on input tables: target_data has 5B rows in 430 files (81GB), liquid clustered by user_id. source_data2 has 5M rows in 5 files (76MB), user_id is the unique key in both the tables.
No broadcast hint
Lets start with the merge query below, lets call this query2.
merge into
target_data as tgt
using
source_data2 as src
on tgt.user_id = src.user_id
when matched then update set *
when not matched then insert *
Even though we are using Photon here, DFP doesn’t kick in during the scan phase. The size of the source table is 76MB, which is too large to automatically be broadcasted, so ShuffleHash join is performed instead of BroadcastHash. DFP kicks in only when the join can be broadcasted. Since there is no DFP, file pruning is also not effective.
The query took 2 min 39s to finish, but most of the time went in the scan phase (2 min 29 s).
Broadcast hint
Even though we are using Photon, and the target table is liquid clustered, merge is still slow. Now lets try to speed up the merge query by using broadcast hint on the source table. We are trying to explicitly broadcast a 76MB (source) table, to avoid shuffling of the 81GB (target) table. lets call this query3.
merge into
target_data as tgt
using (
select /*+ BROADCAST */ * from source_data2
) as src
on tgt.user_id = src.user_id
when matched then update set *
when not matched then insert *
Now BroadcastHash join is performed instead of ShuffleHash join, DFP kicks in; file pruning is also very effective.
The query took 21s to finish (previously 2 min 39s), scan phase is much faster.
If the same query was run using broadcast hint on a classic warehouse, we should still see good improvement vs without hint. The improvement can be attributed to the fact that we are still avoiding shufflehash join, but DFP won’t kick in classic warehouses.
We can see that merge execution time is reduced from 159 sec to 21 sec by broadcasting the table source table, 7x improvement.
The goal is not to suggest broadcasting always while doing merge, it worked out in this case since by explicitly broadcasting a 76MB (source) table we avoided shuffling of the 81GB (target) table. Another successful example can be broadcasting a 2GB source table vs a 2TB target table. While broadcasting 2GB might sound unrealistic but still worth it considering the target table size. An unsuccessful example can be broadcasting a 2GB source table vs a 10GB target table.
Merge query - Left Anti
In this section we are going to cover a MERGE that has only when not matched insert clause, and how to rewrite it to become faster. This kind of MERGE is more of an INSERT, but it checks to see if a row’s key already exist in the target. Both the queries are run on a medium serverless warehouse.
Quick summary on input tables. target_data has 5B rows(81GB), liquid clustered by user_id. source_data1 has 8k rows(155 KB), user_id is the unique key in both the tables. There are 3,999 user_ids that exist in source but not in target.
Merge query - only when not matched
Lets start with the merge query below, lets call this query4.
merge into
target_data as tgt
using
source_data1 as src
on tgt.user_id = src.user_id
when not matched then insert *
In this scenario the query optimizer recognizes that the user is just inserting data, and chooses to perform a left anti join between source and target.
It took 48 sec to finish the query, and there is no scan phase, only rewrite.
Query profile shows that its source left anti join target and then insert into target. Broadcast hints likely will not work. The left side may be small enough to be broadcasted, but you can only broadcast the right side of a left anti join. The right side is often too large to be broadcasted.
Refactoring merge query with joins
As discussed above, the source left anti join target can be very expensive, as it involves shuffling the target table. Lets speed up the merge process by changing it to below. Lets call this query5.
insert into target_data
with matching as (
select source_data1.user_id
from target_data inner join source_data1
on target_data.user_id = source_data1.user_id
)
select
source_data1.*
from source_data1 left anti join matching
on source_data1.user_id = matching.user_id
With query5, you may have noticed that we aren’t performing a merge at all, and you’d be right. We rely on the nature of the type of this merge, it is just an insert, we do not rewrite any rows in the target. Instead of performing a left anti join, we first perform a broadcastable inner join between source and target to find any matching rows. Next, we anti join the source table with these matches. Finally, we safely insert rows resulting from both joins into the table.
In this case, we were able to broadcast the first join, leading to a runtime improvement of ~2x (43s vs 23s).
Summary
- Your merge will be most performant if you are using Photon engine + your target table is liquid clustered by a merge key. This is due to DFP, and we always recommend using Photon for merge.
- While you can get merge metrics from Spark UI, we can always programmatically get them from Delta table history. This comes in very handy for identifying slowness/bottlenecks in merge.
- If your source table is not broadcasted during the scan phase of the merge, and depending on the size of your source and target tables, we can use broadcast hints to speed up the merge.
- Merge statement with only when not matched then insert can be rewritten using joins and can be faster than merge most of the times.
Code for creating input tables
%pip install dbldatagen
%sql
set spark.sql.shuffle.partitions = auto;
create catalog if not exists mt_blog;
create schema if not exists merge_demo;
import dbldatagen as dg
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, StringType, DateType, LongType, TimestampType
from datetime import timedelta, datetime, date
merge_source_generator = (dg.DataGenerator(spark, name="merge_source", rows=5000000000, partitions=400)
.withColumn("user_id", LongType(), minValue=1, maxValue=5000000000, step=1, random=False, uniqueValues = 5000000000)
.withColumn("segment_id", IntegerType(), minValue=1, maxValue=10000000, step=1, random=False)
.withColumn("dt", DateType(), random=False)
.withColumn("event_ts", TimestampType(), begin="2010-01-01 01:00:00", end="2024-12-31 23:59:00", interval="1 second", random=False )
)
merge_source_df = merge_source_generator.build() # build our dataset
merge_source_df.repartition(500, 'user_id').write.format('delta').mode('overwrite').saveAsTable('mt_blog.merge_demo.target_data')
%sql
alter table mt_blog.merge_demo.target_data cluster by (user_id);
optimize mt_blog.merge_demo.target_data;
merge_change_generator1 = (dg.DataGenerator(spark, name="merge_source", rows=8000, partitions=5)
.withColumn("user_id", LongType(), minValue=4900000000, maxValue=5100000000, step=25000, random=False, uniqueValues = 8000)
.withColumn("segment_id", IntegerType(), minValue=10000001, maxValue=11000000, step=1, random=True)
.withColumn("dt", DateType(), random=True)
.withColumn("event_ts", TimestampType(), begin="2025-01-01 01:00:00", end="2025-12-31 23:59:00", interval="1 second", random=False )
)
merge_change_df1 = merge_change_generator1.build() # build our dataset
merge_change_df1.write.format('delta').mode('overwrite').saveAsTable('mt_blog.merge_demo.source_data1')
merge_change_generator2 = (dg.DataGenerator(spark, name="merge_source", rows=5000000, partitions=5)
.withColumn("user_id", LongType(), minValue=4800000000, maxValue=5300000000, step=100, random=False, uniqueValues = 5000000)
.withColumn("segment_id", IntegerType(), minValue=10000001, maxValue=11000000, step=1, random=True)
.withColumn("dt", DateType(), random=True)
.withColumn("event_ts", TimestampType(), begin="2025-01-01 01:00:00", end="2025-12-31 23:59:00", interval="1 second", random=False )
)
merge_change_df2 = merge_change_generator2.build() # build our dataset
merge_change_df2.write.format('delta').mode('overwrite').saveAsTable('mt_blog.merge_demo.source_data2')
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.