cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Merge slows down when the table grows with liquid clustering enabled.

filipniziol
Esteemed Contributor

Hi Everyone,

I have a source table and target table and MERGE statement that is inserting/updating records every couple of minutes.

The clustering keys are set up to match the 2 merge join columns.

I noticed that with time the processing time increases, which means with bigger target dataset it is harder and harder to find the matching records.

I would hope that liquid clustering works as indexing, so it would be always fast to find the matching records, no matter the size of target dataset.


How to solve this problem?


I am thinking about:
1. Get rid of updates, make silver table append only and use state table to keep the "current" records. Merge source with "current" records and then append records to silver.

2. Create another column "Is current". Add it to liquid clustering. With every processing, add is current to join condition. Then update "Is current" to false for all the records, that do not belong to current update.

Any thoughts?

1 REPLY 1

kerem
Contributor

Hi @filipniziol ,

I dealt with a large table of about a TB in size with liquid clustering enabled. Even with Liquid Clustering, selects and joins on the clustered columns took longer as the table grew. So I don't think it performs as fast as the table grows. 

However I deal with incremental updates with an Autoloader stream to handle both insert and upsert operations. Have you tried setting up a stream with Change Data Feed enabled? That saves you from the merge operation and leaves tracking down new and updated files to Autoloader with its checkpoints and metadata. 

Here is a sample DLT (Lakeflow Declarative Pipelines) pipeline code with CDF enabled:

display(spark.read.json("/demos/dlt/cdc_raw/customers"))

# Read source files from cloud object storage
import dlt
from pyspark.sql.functions import *
##Create the bronze information table containing the raw JSON data taken from the storage path 
@dlt.create_table(name="customers_raw_1",
                  comment = "New customer data incrementally ingested from cloud object storage landing zone")
def customers_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .option("cloudFiles.inferColumnTypes", "true")
      .load("/demos/dlt/cdc_raw/customers"))

#Create the target table
dlt.create_target_table(name="customers_2", comment="Clean, materialized customers")


dlt.apply_changes(
  target = "customers_2", #The customer table being materialized
  source = "customers_raw_1", #the incoming CDC
  keys = ["id"], #what we'll be using to match the rows to upsert
  sequence_by = col("operation_date"), #we deduplicate by operation date getting the most recent value
  ignore_null_updates = False,
  #apply_as_deletes = expr("operation = 'DELETE'"), #if you have operation column and have insert,update, and delete flag then you can use this DELETE condition
  except_column_list = ["operation", "operation_date"]) #in addition we drop metadata columns

 

Kerem Durak