Hi @yit337 !
This is expected for AUTO CDC with SCD2 and it is not doing a simple append because it must upsert incoming CDC rows into the target based on the declared keys and for SCD2 it also maintains historical rows with __START_AT or __END_AT.
So DBKS has to find the matching existing target records before it can close the old version and insert the new one.
Liquid clustering only helps if the generated plan can prune files using a selective predicate and clustering on factory_code helps queries like: WHERE factory_code = 'X'
but if every micro batch contains many or all of the 60 factory_code values or if the AUTO CDC generated merge or join plan does not produce a selective target side predicate and you can still see 0% file pruning.
DPP/DFP can be skipped by the optimizer when the cost of building the pruning filter is not expected to pay off.
For improvement, you need to make sure both columns are in the CDC keys and if the real business key is (factory_code, pk) you need to define it like this:
dp.create_auto_cdc_flow(
target="gold_fact",
source="silver_cdf_view",
keys=["factory_code", "pk"],
sequence_by=col("_commit_version"),
stored_as_scd_type=2,
apply_as_deletes=expr("_change_type = 'delete'")
)and of factory_code is only a normal column and not part of keys clustering by it will not help the target lookup much.
Or try to reduce the CDC source before AUTO CDC and do not pass unnecessary CDF rows or columns into the flow. For example, you can exclude update_preimage unless you need it:
@dp.view
def silver_cdf_view():
return (
spark.readStream
.option("readChangeFeed", "true")
.table("silver_table")
.where("_change_type IN ('insert', 'update_postimage', 'delete')")
.select(
"factory_code",
"pk",
"col1",
"col2",
"_change_type",
"_commit_version",
"_commit_timestamp"
)
)materializeSourceTimeMs is source side time: "time taken to materialize source or determine it is not needed" so a complex CDF source view can be a big part of the cost too.
You can cluster in your case y by factory_code because I am assuming it is too low cardinality. Since pk is the actual lookup key test one of these on a lower environment:
@dp.table(cluster_by=["factory_code", "pk"])
def ...
or:
@dp.table(cluster_by_auto=True)
def ...
I am thinking about these points so please test and share your finding ๐
If this answer resolves your question, could you please mark it as โAccept as Solutionโ? It will help other users quickly find the correct fix.
Senior BI/Data Engineer | Microsoft MVP Data Platform | Microsoft MVP Power BI | Power BI Super User | C# Corner MVP