kerem
Contributor

Hi @filipniziol ,

I dealt with a large table of about a TB in size with liquid clustering enabled. Even with Liquid Clustering, selects and joins on the clustered columns took longer as the table grew. So I don't think it performs as fast as the table grows. 

However I deal with incremental updates with an Autoloader stream to handle both insert and upsert operations. Have you tried setting up a stream with Change Data Feed enabled? That saves you from the merge operation and leaves tracking down new and updated files to Autoloader with its checkpoints and metadata. 

Here is a sample DLT (Lakeflow Declarative Pipelines) pipeline code with CDF enabled:

display(spark.read.json("/demos/dlt/cdc_raw/customers"))

# Read source files from cloud object storage
import dlt
from pyspark.sql.functions import *
##Create the bronze information table containing the raw JSON data taken from the storage path 
@dlt.create_table(name="customers_raw_1",
                  comment = "New customer data incrementally ingested from cloud object storage landing zone")
def customers_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .option("cloudFiles.inferColumnTypes", "true")
      .load("/demos/dlt/cdc_raw/customers"))

#Create the target table
dlt.create_target_table(name="customers_2", comment="Clean, materialized customers")


dlt.apply_changes(
  target = "customers_2", #The customer table being materialized
  source = "customers_raw_1", #the incoming CDC
  keys = ["id"], #what we'll be using to match the rows to upsert
  sequence_by = col("operation_date"), #we deduplicate by operation date getting the most recent value
  ignore_null_updates = False,
  #apply_as_deletes = expr("operation = 'DELETE'"), #if you have operation column and have insert,update, and delete flag then you can use this DELETE condition
  except_column_list = ["operation", "operation_date"]) #in addition we drop metadata columns

 

Kerem Durak