DLT Serverless incremental refresh of materialized view
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-23-2024 05:53 AM - edited 10-23-2024 05:57 AM
I have a materialized view that always does a "COMPLETE_RECOMPUTE", but I can't figure out why.
I found how I can get the logs:
SELECT * FROM event_log(pipeline_id)
WHERE event_type = 'planning_information'
ORDER BY timestamp desc;
And for my table I got this info:
{
"planning_information": {
"technique_information": [
{
"maintenance_type": "MAINTENANCE_TYPE_GENERIC_AGGREGATE",
"incrementalization_issues": [
{
"issue_type": "PLAN_NOT_INCREMENTALIZABLE",
"prevent_incrementalization": true,
"operator_name": "Aggregate",
"plan_not_incrementalizable_sub_type": "AGGREGATE_NOT_TOP_NODE"
}
]
},
{
"maintenance_type": "MAINTENANCE_TYPE_ROW_BASED",
"incrementalization_issues": [
{
"issue_type": "PLAN_NOT_INCREMENTALIZABLE",
"prevent_incrementalization": true,
"operator_name": "Join",
"join_type": "LEFT_OUTER"
}
]
},
{
"maintenance_type": "MAINTENANCE_TYPE_COMPLETE_RECOMPUTE",
"is_chosen": true,
"is_applicable": true,
"cost": 34936819266
}
],
}
It seems like there is an issue with the left outer join?
But normally that is a supported operation:
https://learn.microsoft.com/en-us/azure/databricks/optimizations/incremental-refresh#enzyme-support
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-23-2024 03:39 PM
From the plan it says that aggregation is happening before the join. Can you ensure the join happens first and then the aggregation? Are you able to share the query?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-23-2024 11:46 PM
There is some stuff that happens in separate custom functions, but all the functions consist of supported operations.
The aggregation is .groupBy("Name").count() before anything happens as a way to distinct the data. (because 'distinct' is not supported but 'group by' is).
Afterwards it's a bit of processing and then finally a join on some fields extracted from the 'Name'.
So Materialized Views don't support joins as a final step?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-24-2024 12:58 AM - edited 10-24-2024 01:00 AM
I have split up the materialized view in 3 separate ones:
MV1: deduplicate by grouping:
.table(name="step1", table_properties={"delta.enableRowTracking": "true"})
def step1():
isolate_names = dlt.read("source_data").select("Name").groupBy("Name").count()
return isolate_names
MV2:
step2: Use step1 and process data (notable functions: split, slice, sha2, explode -> select expressions)
.table(name="step2", table_properties={"delta.enableRowTracking": "true"})
def step2():
df = dlt.read(step1).select("Name")
...
MV3:
.table(name="step3", table_properties={"delta.enableRowTracking": "true"})
def step3():
step2 = dlt.read("step2")
meta = spark.table("MetaData").alias("meta")
add_meta = (
step2.alias("step2")
.join(
meta,
on=[
f.col("step2.colA") == f.col("meta.col1"),
f.col("step2.colB") == f.col("meta.col2"),
],
how="left",
)
.select("step2.*", "meta.Id")
)
return add_meta
for step 3 I get now these incrementalization issues:
{
"planning_information": {
"technique_information": [
{
"incrementalization_issues": [
{
"issue_type": "CDF_UNAVAILABLE",
"prevent_incrementalization": true,
"table_information": {
"table_name": "step2",
}
}
]
},
{
"maintenance_type": "MAINTENANCE_TYPE_ROW_BASED",
"incrementalization_issues": [
{
"issue_type": "ROW_TRACKING_NOT_ENABLED",
"prevent_incrementalization": true,
"table_information": {
"table_name": "step2",
}
},
{
"issue_type": "PLAN_NOT_INCREMENTALIZABLE",
"prevent_incrementalization": true,
"operator_name": "Join",
"join_type": "LEFT_OUTER"
}
]
},
{
"maintenance_type": "MAINTENANCE_TYPE_COMPLETE_RECOMPUTE",
"is_chosen": true,
"is_applicable": true,
"cost": 78952
}
],
"source_table_information": [
{
"table_name": "step2",
"full_size": 3791,
"is_size_after_pruning": true,
"is_row_id_enabled": true,
"is_cdf_enabled": true,
"is_deletion_vector_enabled": false
},
{
"table_name": "meta",
"full_size": 1747,
"is_size_after_pruning": true,
"is_row_id_enabled": false,
"is_cdf_enabled": false,
"is_deletion_vector_enabled": true
}
],
"target_table_information": {
"table_name": "step3",
"full_size": 3943,
"is_row_id_enabled": true,
"is_cdf_enabled": true,
"is_deletion_vector_enabled": false
}
}
}
for step1 and step2 I got these messages:
Step1 has been planned in DLT to be executed as GROUP_AGGREGATE. (-> incremental?)
Step2 has been planned in DLT to be executed as COMPLETE_RECOMPUTE. Another option is available:GROUP_AGGREGATE. COMPLETE_RECOMPUTE was chosen in the current run for its optimal performance.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-24-2024 02:04 AM
I split up materialized view in 3 separate ones:
step1:
@Dlt.table(name="step1", table_properties={"delta.enableRowTracking": "true"})
def step1():
isolate_names = dlt.read("soruce").select("Name").groupBy("Name").count()
return isolate_names
step2:
uses step1 and does some extra processing steps (notable functions: split, explode, slice, ... -> select expressions mostly)
@Dlt.table(name="step2", table_properties={"delta.enableRowTracking": "true"})
def step2():
asa_telemetry = dlt.read("step1").select("Name")
....
step3:
@Dlt.table(name=step3, table_properties={"delta.enableRowTracking": "true"})
def step3():
step2 = dlt.read("step2")
meta = spark.table("MetaData").alias("meta")
add_id = (
step2.alias("step2")
.join(
meta,
on=[
f.col("step2.colA") == f.col("meta.col1"),
f.col("step2.colB") == f.col("meta.col2"),
],
how="left",
)
.select("step2.*", "meta.Id")
)
return add_id
For step 1 I get this message:
Flow 'step1' has been planned in DLT to be executed as GROUP_AGGREGATE.
For step2 I get this message:
Flow 'step2' has been planned in DLT to be executed as COMPLETE_RECOMPUTE. Another option is available:GROUP_AGGREGATE. COMPLETE_RECOMPUTE was chosen in the current run for its optimal performance.
So step1 and 2 don't have incrementalisation issues
Step 3 has issues:
{
"planning_information": {
"technique_information": [
{
"incrementalization_issues": [
{
"issue_type": "CDF_UNAVAILABLE",
"prevent_incrementalization": true,
"table_information": {
"table_name": "step2",
}
}
]
},
{
"maintenance_type": "MAINTENANCE_TYPE_ROW_BASED",
"incrementalization_issues": [
{
"issue_type": "ROW_TRACKING_NOT_ENABLED",
"prevent_incrementalization": true,
"table_information": {
"table_name": "step2",
}
},
{
"issue_type": "PLAN_NOT_INCREMENTALIZABLE",
"prevent_incrementalization": true,
"operator_name": "Join",
"join_type": "LEFT_OUTER"
}
]
},
{
"maintenance_type": "MAINTENANCE_TYPE_COMPLETE_RECOMPUTE",
"is_chosen": true,
"is_applicable": true,
"cost": 78952
}
],
"source_table_information": [
{
"table_name": "`step2",
"table_id": "bd21e05c-8011-485a-8f0f-8dc82656d31e",
"full_size": 3791,
"is_size_after_pruning": true,
"is_row_id_enabled": true,
"is_cdf_enabled": true,
"is_deletion_vector_enabled": false
},
{
"table_name": "MetaData",
"full_size": 1747,
"is_size_after_pruning": true,
"is_row_id_enabled": false,
"is_cdf_enabled": false,
"is_deletion_vector_enabled": true
}
],
"target_table_information": {
"table_name": "step3",
"full_size": 3943,
"is_row_id_enabled": true,
"is_cdf_enabled": true,
"is_deletion_vector_enabled": false
}
}
}
So the problem really seems to be the left join?

