Hi @filipniziol ,
I dealt with a large table of about a TB in size with liquid clustering enabled. Even with Liquid Clustering, selects and joins on the clustered columns took longer as the table grew. So I don't think it performs as fast as the table grows.
However I deal with incremental updates with an Autoloader stream to handle both insert and upsert operations. Have you tried setting up a stream with Change Data Feed enabled? That saves you from the merge operation and leaves tracking down new and updated files to Autoloader with its checkpoints and metadata.
Here is a sample DLT (Lakeflow Declarative Pipelines) pipeline code with CDF enabled:
display(spark.read.json("/demos/dlt/cdc_raw/customers"))
# Read source files from cloud object storage
import dlt
from pyspark.sql.functions import *
##Create the bronze information table containing the raw JSON data taken from the storage path
@dlt.create_table(name="customers_raw_1",
comment = "New customer data incrementally ingested from cloud object storage landing zone")
def customers_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load("/demos/dlt/cdc_raw/customers"))
#Create the target table
dlt.create_target_table(name="customers_2", comment="Clean, materialized customers")
dlt.apply_changes(
target = "customers_2", #The customer table being materialized
source = "customers_raw_1", #the incoming CDC
keys = ["id"], #what we'll be using to match the rows to upsert
sequence_by = col("operation_date"), #we deduplicate by operation date getting the most recent value
ignore_null_updates = False,
#apply_as_deletes = expr("operation = 'DELETE'"), #if you have operation column and have insert,update, and delete flag then you can use this DELETE condition
except_column_list = ["operation", "operation_date"]) #in addition we drop metadata columns
Kerem Durak