10-23-2024 05:53 AM - edited 10-23-2024 05:57 AM
I have a materialized view that always does a "COMPLETE_RECOMPUTE", but I can't figure out why.
I found how I can get the logs:
SELECT * FROM event_log(pipeline_id)
WHERE event_type = 'planning_information'
ORDER BY timestamp desc;
And for my table I got this info:
{
"planning_information": {
"technique_information": [
{
"maintenance_type": "MAINTENANCE_TYPE_GENERIC_AGGREGATE",
"incrementalization_issues": [
{
"issue_type": "PLAN_NOT_INCREMENTALIZABLE",
"prevent_incrementalization": true,
"operator_name": "Aggregate",
"plan_not_incrementalizable_sub_type": "AGGREGATE_NOT_TOP_NODE"
}
]
},
{
"maintenance_type": "MAINTENANCE_TYPE_ROW_BASED",
"incrementalization_issues": [
{
"issue_type": "PLAN_NOT_INCREMENTALIZABLE",
"prevent_incrementalization": true,
"operator_name": "Join",
"join_type": "LEFT_OUTER"
}
]
},
{
"maintenance_type": "MAINTENANCE_TYPE_COMPLETE_RECOMPUTE",
"is_chosen": true,
"is_applicable": true,
"cost": 34936819266
}
],
}
It seems like there is an issue with the left outer join?
But normally that is a supported operation:
https://learn.microsoft.com/en-us/azure/databricks/optimizations/incremental-refresh#enzyme-support
10-23-2024 03:39 PM
From the plan it says that aggregation is happening before the join. Can you ensure the join happens first and then the aggregation? Are you able to share the query?
10-23-2024 11:46 PM
There is some stuff that happens in separate custom functions, but all the functions consist of supported operations.
The aggregation is .groupBy("Name").count() before anything happens as a way to distinct the data. (because 'distinct' is not supported but 'group by' is).
Afterwards it's a bit of processing and then finally a join on some fields extracted from the 'Name'.
So Materialized Views don't support joins as a final step?
10-24-2024 12:58 AM - edited 10-24-2024 01:00 AM
I have split up the materialized view in 3 separate ones:
MV1: deduplicate by grouping:
.table(name="step1", table_properties={"delta.enableRowTracking": "true"})
def step1():
isolate_names = dlt.read("source_data").select("Name").groupBy("Name").count()
return isolate_names
MV2:
step2: Use step1 and process data (notable functions: split, slice, sha2, explode -> select expressions)
.table(name="step2", table_properties={"delta.enableRowTracking": "true"})
def step2():
df = dlt.read(step1).select("Name")
...
MV3:
.table(name="step3", table_properties={"delta.enableRowTracking": "true"})
def step3():
step2 = dlt.read("step2")
meta = spark.table("MetaData").alias("meta")
add_meta = (
step2.alias("step2")
.join(
meta,
on=[
f.col("step2.colA") == f.col("meta.col1"),
f.col("step2.colB") == f.col("meta.col2"),
],
how="left",
)
.select("step2.*", "meta.Id")
)
return add_meta
for step 3 I get now these incrementalization issues:
{
"planning_information": {
"technique_information": [
{
"incrementalization_issues": [
{
"issue_type": "CDF_UNAVAILABLE",
"prevent_incrementalization": true,
"table_information": {
"table_name": "step2",
}
}
]
},
{
"maintenance_type": "MAINTENANCE_TYPE_ROW_BASED",
"incrementalization_issues": [
{
"issue_type": "ROW_TRACKING_NOT_ENABLED",
"prevent_incrementalization": true,
"table_information": {
"table_name": "step2",
}
},
{
"issue_type": "PLAN_NOT_INCREMENTALIZABLE",
"prevent_incrementalization": true,
"operator_name": "Join",
"join_type": "LEFT_OUTER"
}
]
},
{
"maintenance_type": "MAINTENANCE_TYPE_COMPLETE_RECOMPUTE",
"is_chosen": true,
"is_applicable": true,
"cost": 78952
}
],
"source_table_information": [
{
"table_name": "step2",
"full_size": 3791,
"is_size_after_pruning": true,
"is_row_id_enabled": true,
"is_cdf_enabled": true,
"is_deletion_vector_enabled": false
},
{
"table_name": "meta",
"full_size": 1747,
"is_size_after_pruning": true,
"is_row_id_enabled": false,
"is_cdf_enabled": false,
"is_deletion_vector_enabled": true
}
],
"target_table_information": {
"table_name": "step3",
"full_size": 3943,
"is_row_id_enabled": true,
"is_cdf_enabled": true,
"is_deletion_vector_enabled": false
}
}
}
for step1 and step2 I got these messages:
Step1 has been planned in DLT to be executed as GROUP_AGGREGATE. (-> incremental?)
Step2 has been planned in DLT to be executed as COMPLETE_RECOMPUTE. Another option is available:GROUP_AGGREGATE. COMPLETE_RECOMPUTE was chosen in the current run for its optimal performance.
10-24-2024 02:04 AM
I split up materialized view in 3 separate ones:
step1:
@Dlt.table(name="step1", table_properties={"delta.enableRowTracking": "true"})
def step1():
isolate_names = dlt.read("soruce").select("Name").groupBy("Name").count()
return isolate_names
step2:
uses step1 and does some extra processing steps (notable functions: split, explode, slice, ... -> select expressions mostly)
@Dlt.table(name="step2", table_properties={"delta.enableRowTracking": "true"})
def step2():
asa_telemetry = dlt.read("step1").select("Name")
....
step3:
@Dlt.table(name=step3, table_properties={"delta.enableRowTracking": "true"})
def step3():
step2 = dlt.read("step2")
meta = spark.table("MetaData").alias("meta")
add_id = (
step2.alias("step2")
.join(
meta,
on=[
f.col("step2.colA") == f.col("meta.col1"),
f.col("step2.colB") == f.col("meta.col2"),
],
how="left",
)
.select("step2.*", "meta.Id")
)
return add_id
For step 1 I get this message:
Flow 'step1' has been planned in DLT to be executed as GROUP_AGGREGATE.
For step2 I get this message:
Flow 'step2' has been planned in DLT to be executed as COMPLETE_RECOMPUTE. Another option is available:GROUP_AGGREGATE. COMPLETE_RECOMPUTE was chosen in the current run for its optimal performance.
So step1 and 2 don't have incrementalisation issues
Step 3 has issues:
{
"planning_information": {
"technique_information": [
{
"incrementalization_issues": [
{
"issue_type": "CDF_UNAVAILABLE",
"prevent_incrementalization": true,
"table_information": {
"table_name": "step2",
}
}
]
},
{
"maintenance_type": "MAINTENANCE_TYPE_ROW_BASED",
"incrementalization_issues": [
{
"issue_type": "ROW_TRACKING_NOT_ENABLED",
"prevent_incrementalization": true,
"table_information": {
"table_name": "step2",
}
},
{
"issue_type": "PLAN_NOT_INCREMENTALIZABLE",
"prevent_incrementalization": true,
"operator_name": "Join",
"join_type": "LEFT_OUTER"
}
]
},
{
"maintenance_type": "MAINTENANCE_TYPE_COMPLETE_RECOMPUTE",
"is_chosen": true,
"is_applicable": true,
"cost": 78952
}
],
"source_table_information": [
{
"table_name": "`step2",
"table_id": "bd21e05c-8011-485a-8f0f-8dc82656d31e",
"full_size": 3791,
"is_size_after_pruning": true,
"is_row_id_enabled": true,
"is_cdf_enabled": true,
"is_deletion_vector_enabled": false
},
{
"table_name": "MetaData",
"full_size": 1747,
"is_size_after_pruning": true,
"is_row_id_enabled": false,
"is_cdf_enabled": false,
"is_deletion_vector_enabled": true
}
],
"target_table_information": {
"table_name": "step3",
"full_size": 3943,
"is_row_id_enabled": true,
"is_cdf_enabled": true,
"is_deletion_vector_enabled": false
}
}
}
So the problem really seems to be the left join?
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group