Saturday
My delta table are stored at HANA data lake file and I have ETL configured like below
@DP.materialized_view(temporary=True)
def source():
return spark.read.format("delta").load("/data/source")
@dp.materialized_view
def sink():
return spark.read.table("source").withColumnRenamed("COL_A", "COL_B")When I first ran pipeline, it show 100k records has been processed for both table.
For the second run, since there is no update from source table, so I'm expecting no records will be processed. But the dashboard still show 100k.
I'm also check whether the source table enable change data feed by executing
dt = DeltaTable.forPath(spark, "/data/source")
detail = dt.detail().collect()[0]
props = detail.asDict().get("properties", {})
for k, v in props.items():
print(f"{k}: {v}")and the result is
pipelines.metastore.tableName: `default`.`source`
pipelines.pipelineId: 645fa38f-f6bf-45ab-a696-bd923457dc85
delta.enableChangeDataFeed: trueAnybody knows what am I missing here?
Thank in advance.
21 hours ago
Hi @anhnnguyen,
You defined source for you CDF as temporary view and they are always fully refreshed on every pipeline run.
Try defining it without this option
18 hours ago
The issue comes from defining your source as a temporary materialized view:
@DP.materialized_view(temporary=True)
def source():
Temporary materialized views do not track state between pipeline runs. Because of that, the view is fully refreshed every time, so the pipeline always reprocesses the entire dataset instead of using CDF incrementally.
To fix this, remove temporary=True so the materialized view can maintain state and leverage Change Data Feed properly
@DP.materialized_view
def source():
After making this change, your pipeline should only process incremental changes.
16 hours ago
after removing temporary=True, pipeline still full recompute every run even though there is no change in source
14 hours ago
1 more note that I'm not using Unity Catalog here, not sure if it's relevant
13 hours ago
Can you try to register in UC as an external table? Additionally, if there is column masking or row filtering in Delta, it will always be a full recompute.
Also, your metadata shows CDF enabled, row tracking is different TBLPROPERTIES. The best would be like that:
ALTER TABLE <table-name> SET TBLPROPERTIES (
delta.enableDeletionVectors = true,
delta.enableRowTracking = true,
delta.enableChangeDataFeed = true);
5 hours ago
I tried enabling 3 options as recommended in documentation
"delta.enableChangeDataFeed": "true"
"delta.enableRowTracking": "true"
"delta.enableDeletionVectors": "true"but no luck, will try registering as external table later since my workspace has not enabled it yet.
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now