If you've worked with Delta Lake at scale, you've encountered this: your MERGE operation that once completed in seconds now takes minutes—or hours—as your table grows.
The culprit? Without optimization, MERGE operations scan far more data than necessary. Even when your table is properly partitioned or liquid clustered and your MERGE condition includes the key column.
Here's the part that's often overlooked: the key is to pre-scan your source data and inject literal values into the MERGE condition. This enables partition and file pruning that would otherwise be impossible—for both streaming and batch workloads.
This article explains how MERGE executes at a high level, why the default behavior is problematic, and the strategies to fix it.
A typical MERGE looks innocent enough:
target_delta.alias("target").merge(
source=source_df.alias("source"),
condition="target.order_id = source.order_id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
Delta Lake executes this through multiple phases:
Phase 1: Transaction Log Analysis Spark reads the _delta_log to build the current table state—loading checkpoint files, reading commit JSONs, and extracting file paths, partition values, and column statistics.
Phase 2: File Selection (THE PROBLEM) Delta determines which files to scan. By default, this is ALL files. We'll dive deep into why shortly.
Phase 3: Join Execution Delta joins source and target to find matches. The join strategy is chosen by the optimizer based on data sizes and the MERGE clauses — you'll see BroadcastHashJoin, SortMergeJoin, or ShuffledHashJoin in the query plan, with join types like LeftSemi, Inner, or LeftAnti depending on whether the MERGE has matched updates, inserts, or both.
Phase 4: File Rewrite (Copy-on-Write) By default, every affected file is rewritten entirely. Even updating 1 row in a 256MB file means reading 256MB and writing 256MB. This is write amplification. (With Deletion Vectors enabled, Delta can mark rows as deleted without full file rewrites—more on this later.)
Phase 5: Transaction Commit The transaction log is updated atomically with remove actions for old files and add actions for new files.
Consider an e-commerce orders table:
Your MERGE condition looks like this:
target.region = source.region
AND target.order_id = source.order_id
AND target.event_time = source.event_time
This is exactly how you should write it—matching on the key column and full primary key. Yet every micro-batch scans the entire table, even when processing just 5,000 order updates from 2 regions.
Why? This brings us back to Phase 2.
The MERGE condition above includes region — whether that's a partition column or a Liquid Clustering key. Intuitively, you'd expect Spark to:
But this doesn't happen.
At query planning time (Phase 2), the source DataFrame hasn't been evaluated—Spark doesn't know what values it contains. The condition target.region = source.region references a column from an unevaluated DataFrame. Spark cannot push this down as a predicate because it has no concrete values to filter on.
The result: every file in your table gets scanned, regardless of how you structure your join condition — whether your table uses PARTITIONED BY (region) or CLUSTER BY (region, ...).
This is a common surprise for engineers: "My table is partitioned/clustered by region and I'm joining on region—why is it still scanning everything?"
Pruning only works when the merge condition contains literal values that Spark can evaluate at plan time. This applies equally whether your table uses partitioning or Liquid Clustering — the technique is identical.
Same table, same primary key. But now we extract values first:
# Pre-scan the source batch
regions = source_df.select("region").distinct().collect()
region_list = ", ".join([f"'{r.region}'" for r in regions])
# Inject literals into the condition
condition = f"""
target.region IN ({region_list})
AND target.region = source.region
AND target.order_id = source.order_id
AND target.event_time = source.event_time
"""
The IN ({region_list}) clause contains literal strings like IN ('us-east', 'eu-central'). These are known at plan time. Spark can now skip data:
The rest of the condition handles the actual row matching. The literal clause handles the pruning.
Key insight: You're not replacing the join condition—you're augmenting it with pruning hints that Spark can evaluate at plan time. This works the same way for PARTITIONED BY tables and CLUSTER BY tables.
Your injected literals drive file skipping. The mechanism depends on your table layout, but the result is the same — Spark reads only the files that could contain matching rows:
|
Table layout |
What target.region IN ('us-east-1', 'eu-central-1') does |
|---|---|
|
PARTITIONED BY (region) |
Skips entire partition directories. 48 of 50 directories ignored — 96% reduction. |
|
CLUSTER BY (region, ...) |
Skips files whose min/max on region don't overlap. Same 96%+ reduction — no directories needed. |
They're equivalent. Whether your table uses partitioning or Liquid Clustering, literal injection on a key column tells Spark exactly which data to skip. The only difference is the mechanism (directory-level vs file-level stats), not the outcome.
Go further with additional columns. Delta Lake stores min/max statistics on columns of numeric, string, date, and timestamp types. Inject ranges on any column with statistics — not just the partition/clustering key:
With Liquid Clustering, all clustering keys (region, event_time, order_id) benefit equally because the curve fitting ensures tight, non-overlapping ranges on all keys simultaneously. With traditional partitioning, only the partition column gets directory-level pruning — other columns rely on Z-ordering for tight file ranges.
Here's the full pattern — extract values from the source, inject them as literals on key columns:
# Extract pruning values
regions = source_df.select("region").distinct().collect()
region_list = ", ".join([f"'{r.region}'" for r in regions])
time_bounds = source_df.agg(F.min("event_time"), F.max("event_time")).collect()[0]
# Inject literals — works for both partitioned and liquid clustered tables
condition = f"""
target.region IN ({region_list})
AND target.event_time >= '{time_bounds.min_time}'
AND target.event_time <= '{time_bounds.max_time}'
AND target.region = source.region
AND target.order_id = source.order_id
AND target.event_time = source.event_time
"""
Important: File-level statistics are only collected for the first 32 columns by default (controlled by delta.dataSkippingNumIndexedCols). For string types it only keeps the first 32 chars. For wide tables, either reorder your columns, increase this setting, or use delta.dataSkippingStatsColumns to specify columns by name.
Safety rule: Only inject range filters on columns that are part of your MERGE key. Filtering on a column not in the MERGE condition risks skipping files that contain rows needing updates.
The literal injection technique requires two passes over your source data: one to extract pruning values, one for the MERGE itself. This has important implications for how you structure your pipeline.
Key requirement for streaming: You must use foreachBatch mode. In a continuous streaming query (without foreachBatch), Spark builds a single execution plan for the entire stream — there's no hook to pre-scan a micro-batch and inject literals before the MERGE runs. The foreachBatch callback gives you a materialized DataFrame that you can query separately before passing it to the MERGE. Without foreachBatch, literal injection is not possible in streaming.
For Streaming (foreachBatch):
def merge_micro_batch(batch_df, batch_id):
# batch_df is already materialized — safe to query twice
regions = batch_df.select("region").distinct().collect()
region_list = ", ".join([f"'{r.region}'" for r in regions])
condition = f"""
target.region IN ({region_list})
AND target.region = source.region
AND target.order_id = source.order_id
AND target.event_time = source.event_time
"""
target_delta.alias("target").merge(
source=batch_df.alias("source"),
condition=condition
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
stream.writeStream.foreachBatch(merge_micro_batch).start()
The micro-batch is small and already bounded — recomputation is negligible. Caching is optional.
For Batch: Cache your source DataFrame first. Without caching, the extract step triggers one computation and the MERGE triggers a second. This double-read is wasteful and dangerous:
Warning — Data Consistency Risk: If your source is non-deterministic (e.g., uses current_timestamp(), reads from a file source that changed between evaluations, or samples data), the MERGE might see different data than what you used for pruning. This can lead to silent data loss or corruption — rows that existed during pruning may vanish during the MERGE, or new rows may appear that don't match the pruning predicates. Always cache non-deterministic sources before extracting pruning literals.
Pattern: source.cache() → collect() → merge() → source.unpersist()
A note on cost: The collect() call introduces its own overhead — it materializes results and returns them to the driver, adding a round-trip. For very small source batches, this overhead may not be worth it if the target table is already well-partitioned and the MERGE is fast. Weigh the cost of the collect() against the savings on the MERGE: if file pruning lets you skip scanning 90% of the target table, the driver round-trip is a bargain.
Bonus — Broadcast Hint: For small source batches (common in streaming), adding a broadcast hint eliminates the shuffle join:
from pyspark.sql.functions import broadcast
target_delta.alias("target").merge(
source=broadcast(batch_df).alias("source"),
condition=condition
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
This pairs well with literal injection — pruning reduces the target side, broadcast eliminates the shuffle on the source side.
Literal injection tells Spark what to filter. But how many files it actually skips depends on how your data is physically organized. If files contain randomly scattered values, even perfect literals won't skip much — every file's min/max range overlaps with everything.
This is where physical layout matters: organize your data so that files contain tight, non-overlapping value ranges. Then your injected literals become surgically precise.
You have two options — choose based on whether you're building new tables or optimizing existing ones.
For new tables, use Liquid Clustering. It replaces both partitioning and Z-ordering with a single, unified mechanism.
-- Everything you need: one line
CREATE TABLE orders (...) CLUSTER BY (region, event_time, order_id);
This is equivalent to — and better than — the traditional two-step approach:
-- Legacy: two separate mechanisms
CREATE TABLE orders (...) PARTITIONED BY (region);
OPTIMIZE orders ZORDER BY (event_time, order_id);
Why it's better for MERGE:
With CLUSTER BY (region, event_time, order_id), all three columns participate in an optimized layout. Files are organized across the entire table — not within partition boundaries. When you inject literals on any of these columns, Spark uses all filters together to skip files:
Literal injection on a clustering key is functionally equivalent to partition pruning — Spark skips all files whose min/max stats on that key don't match the literal. The difference is that it's file-level (not directory-level), there are no partition boundaries, and every clustering key benefits equally from the same technique.
Key advantages:
|
Feature |
Liquid Clustering |
Partitioning + Z-Order |
|---|---|---|
|
Pruning mechanism |
File-level stats on all clustering keys |
Directory pruning on partition column + file-level stats on Z-ordered columns within partitions |
|
Maintenance |
OPTIMIZE is incremental — only rewrites files with new/unclustered data |
OPTIMIZE ZORDER BY is also incremental — targets files with new/changed data, but re-sorts and rewrites all qualifying files to maintain Z-order |
|
Column flexibility |
ALTER TABLE ... CLUSTER BY (new_cols) — metadata-only change, no rewrite. New data uses new keys. Run OPTIMIZE FULL (DBR 16.0+) to recluster old data |
Changing partition columns requires full table rewrite. Changing Z-order columns takes effect on next OPTIMIZE for qualifying files |
|
Cardinality |
Handles automatically any cardinality. It also offers Hierarchical Clustering while lets you prioritize certain low-cardinality columns (like date or region) in Liquid Clustering so data is fully sorted by those columns first, then ZORDERed on the rest |
High-cardinality partitioning creates millions of tiny directories. Z-ordering handles high cardinality well but only within partitions |
|
Cross-key optimization |
It optimizes all keys simultaneously across the entire table |
Z-ordering limited to within partition boundaries — files can't be reorganized across partitions |
|
Automatic tuning |
CLUSTER BY AUTO (DBR 15.4+) — Predictive Optimization selects and evolves keys |
Manual column selection for both partition key and Z-order columns |
Best practice: Choose 1-4 columns that appear most frequently in your MERGE conditions and query filters. In our scenario: CLUSTER BY (region, event_time).
If you have existing partitioned tables that you can't migrate to Liquid Clustering, add Z-ordering to tighten file-level statistics within each partition.
-- Partition column for directory-level pruning
CREATE TABLE orders (...) PARTITIONED BY (region);
-- Z-order columns for file-level pruning within partitions
OPTIMIZE orders ZORDER BY (event_time, order_id);
Literal injection works the same way — inject region values for partition pruning, inject event_time ranges for file pruning within those partitions. The difference from Liquid Clustering is that Z-ordering only operates within partition boundaries and OPTIMIZE rewrites the entire partition every time.
After ZORDER BY (event_time):
File 1: event_time Jan 1-7, order_id 1-50K
File 2: event_time Jan 1-7, order_id 50K-100K
File 3: event_time Jan 8-14, order_id 1-50K
Note: Z-ordering improves locality but does not guarantee non-overlapping ranges, especially with skewed data.
Best practice: Z-order on 2-4 high-cardinality columns that appear frequently in your MERGE conditions.
Without Deletion Vectors, Delta Lake uses Copy-on-Write: to update a row, the entire file containing that row must be rewritten. This creates write amplification.
Example scenario:
With smaller files (64MB):
Trade-off:
For MERGE-heavy tables: Consider smaller file size or enable Deletion Vectors (Delta Lake 2.3+) if the join keys have high cardinality.
What are Deletion Vectors?
Normally, updating or deleting even a single row requires rewriting the entire file that contains it (Copy-on-Write). Deletion Vectors take a different approach: instead of rewriting files, they store a compact bitmap alongside each data file that marks which row positions have been invalidated.
During a MERGE with Deletion Vectors enabled:
The original data files remain untouched. This bypasses Copy-on-Write entirely — writes become much faster at the cost of slightly slower reads (readers must check the deletion vector to filter out invalidated rows). Periodic OPTIMIZE does two things: (1) merge small files (e.g., from new row inserts) into larger, optimally-sized files, and (2) rewrite files to physically remove rows marked as deleted via Deletion Vectors. For (2) specifically, the REORG command is also available as a dedicated alternative.
Don't guess—measure. Run DESCRIBE HISTORY your_table after any MERGE. The operationMetrics map tells you exactly what happened.
Three metrics that matter most:
|
Metric |
What it tells you |
Red flag |
|---|---|---|
|
scanTimeMs |
Time spent reading target files |
Dominates total time → improve pruning |
|
rewriteTimeMs |
Time spent writing new files |
Dominates total time → enable DVs or shrink files |
|
numTargetRowsCopied |
Unchanged rows rewritten alongside updated ones |
Much larger than numTargetRowsUpdated → write amplification |
How to act on them:
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.