cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT Serverless incremental refresh of materialized view

L1000
New Contributor III

I have a materialized view that always does a "COMPLETE_RECOMPUTE", but I can't figure out why.
I found how I can get the logs:

 

 

SELECT * FROM event_log(pipeline_id)
WHERE event_type = 'planning_information'
ORDER BY timestamp desc;

 

 

 

And for my table I got this info:

 

{
  "planning_information": {
    "technique_information": [
      {
        "maintenance_type": "MAINTENANCE_TYPE_GENERIC_AGGREGATE",
        "incrementalization_issues": [
          {
            "issue_type": "PLAN_NOT_INCREMENTALIZABLE",
            "prevent_incrementalization": true,
            "operator_name": "Aggregate",
            "plan_not_incrementalizable_sub_type": "AGGREGATE_NOT_TOP_NODE"
          }
        ]
      },
      {
        "maintenance_type": "MAINTENANCE_TYPE_ROW_BASED",
        "incrementalization_issues": [
          {
            "issue_type": "PLAN_NOT_INCREMENTALIZABLE",
            "prevent_incrementalization": true,
            "operator_name": "Join",
            "join_type": "LEFT_OUTER"
          }
        ]
      },
      {
        "maintenance_type": "MAINTENANCE_TYPE_COMPLETE_RECOMPUTE",
        "is_chosen": true,
        "is_applicable": true,
        "cost": 34936819266
      }
    ],
}

 



It seems like there is an issue with the left outer join?
But normally that is a supported operation:

https://learn.microsoft.com/en-us/azure/databricks/optimizations/incremental-refresh#enzyme-support

4 REPLIES 4

randomquartile
New Contributor II

From the plan it says that aggregation is happening before the join. Can you ensure the join happens first and then the aggregation?  Are you able to share the query?

L1000
New Contributor III

There is some stuff that happens in separate custom functions, but all the functions consist of supported operations.

The aggregation is .groupBy("Name").count() before anything happens as a way to distinct the data. (because 'distinct' is not supported but 'group by' is).
Afterwards it's a bit of processing and then finally a join on some fields extracted from the 'Name'.

So Materialized Views don't support joins as a final step?

L1000
New Contributor III

I have split up the materialized view in 3 separate ones:

MV1: deduplicate by grouping:

 

.table(name="step1", table_properties={"delta.enableRowTracking": "true"})
def step1():
    isolate_names = dlt.read("source_data").select("Name").groupBy("Name").count()
    return isolate_names

 


MV2:
step2: Use step1 and process data (notable functions: split, slice, sha2, explode -> select expressions)

 

.table(name="step2", table_properties={"delta.enableRowTracking": "true"})
def step2():
    df = dlt.read(step1).select("Name")
...

 



MV3:

 

.table(name="step3", table_properties={"delta.enableRowTracking": "true"})
def step3():
    step2 = dlt.read("step2")

    meta = spark.table("MetaData").alias("meta")

    add_meta = (
        step2.alias("step2")
        .join(
            meta,
            on=[
                f.col("step2.colA") == f.col("meta.col1"),
                f.col("step2.colB") == f.col("meta.col2"),
            ],
            how="left",
        )
        .select("step2.*", "meta.Id")
    )

    return add_meta

 


for step 3 I get now these incrementalization issues:

 

{
  "planning_information": {
    "technique_information": [
      {
        "incrementalization_issues": [
          {
            "issue_type": "CDF_UNAVAILABLE",
            "prevent_incrementalization": true,
            "table_information": {
              "table_name": "step2",
            }
          }
        ]
      },
      {
        "maintenance_type": "MAINTENANCE_TYPE_ROW_BASED",
        "incrementalization_issues": [
          {
            "issue_type": "ROW_TRACKING_NOT_ENABLED",
            "prevent_incrementalization": true,
            "table_information": {
              "table_name": "step2",
            }
          },
          {
            "issue_type": "PLAN_NOT_INCREMENTALIZABLE",
            "prevent_incrementalization": true,
            "operator_name": "Join",
            "join_type": "LEFT_OUTER"
          }
        ]
      },
      {
        "maintenance_type": "MAINTENANCE_TYPE_COMPLETE_RECOMPUTE",
        "is_chosen": true,
        "is_applicable": true,
        "cost": 78952
      }
    ],
    "source_table_information": [
      {
        "table_name": "step2",
        "full_size": 3791,
        "is_size_after_pruning": true,
        "is_row_id_enabled": true,
        "is_cdf_enabled": true,
        "is_deletion_vector_enabled": false
      },
      {
        "table_name": "meta",
        "full_size": 1747,
        "is_size_after_pruning": true,
        "is_row_id_enabled": false,
        "is_cdf_enabled": false,
        "is_deletion_vector_enabled": true
      }
    ],
    "target_table_information": {
      "table_name": "step3",
      "full_size": 3943,
      "is_row_id_enabled": true,
      "is_cdf_enabled": true,
      "is_deletion_vector_enabled": false
    }
  }
}

 

for step1 and step2 I got these messages:
Step1 has been planned in DLT to be executed as GROUP_AGGREGATE. (-> incremental?)
Step2 has been planned in DLT to be executed as COMPLETE_RECOMPUTE. Another option is available:GROUP_AGGREGATE. COMPLETE_RECOMPUTE was chosen in the current run for its optimal performance.

 

L1000
New Contributor III

I split up materialized view in 3 separate ones:

step1:

@Dlt.table(name="step1", table_properties={"delta.enableRowTracking": "true"})
def step1():
    isolate_names = dlt.read("soruce").select("Name").groupBy("Name").count()
    return isolate_names



step2:
uses step1 and does some extra processing steps (notable functions: split, explode, slice, ... -> select expressions mostly)

@Dlt.table(name="step2", table_properties={"delta.enableRowTracking": "true"})
def step2():
    asa_telemetry = dlt.read("step1").select("Name")
....



step3:

@Dlt.table(name=step3, table_properties={"delta.enableRowTracking": "true"})
def step3():
    step2 = dlt.read("step2")

    meta = spark.table("MetaData").alias("meta")

    add_id = (
        step2.alias("step2")
        .join(
            meta,
            on=[
                f.col("step2.colA") == f.col("meta.col1"),
                f.col("step2.colB") == f.col("meta.col2"),
            ],
            how="left",
        )
        .select("step2.*", "meta.Id")
    )

    return add_id



For step 1 I get this message:
Flow 'step1' has been planned in DLT to be executed as GROUP_AGGREGATE.

For step2 I get this message:
Flow 'step2' has been planned in DLT to be executed as COMPLETE_RECOMPUTE. Another option is available:GROUP_AGGREGATE. COMPLETE_RECOMPUTE was chosen in the current run for its optimal performance.

So step1 and 2 don't have incrementalisation issues

Step 3 has issues:

{
  "planning_information": {
    "technique_information": [
      {
        "incrementalization_issues": [
          {
            "issue_type": "CDF_UNAVAILABLE",
            "prevent_incrementalization": true,
            "table_information": {
              "table_name": "step2",
            }
          }
        ]
      },
      {
        "maintenance_type": "MAINTENANCE_TYPE_ROW_BASED",
        "incrementalization_issues": [
          {
            "issue_type": "ROW_TRACKING_NOT_ENABLED",
            "prevent_incrementalization": true,
            "table_information": {
              "table_name": "step2",
            }
          },
          {
            "issue_type": "PLAN_NOT_INCREMENTALIZABLE",
            "prevent_incrementalization": true,
            "operator_name": "Join",
            "join_type": "LEFT_OUTER"
          }
        ]
      },
      {
        "maintenance_type": "MAINTENANCE_TYPE_COMPLETE_RECOMPUTE",
        "is_chosen": true,
        "is_applicable": true,
        "cost": 78952
      }
    ],
    "source_table_information": [
      {
        "table_name": "`step2",
        "table_id": "bd21e05c-8011-485a-8f0f-8dc82656d31e",
        "full_size": 3791,
        "is_size_after_pruning": true,
        "is_row_id_enabled": true,
        "is_cdf_enabled": true,
        "is_deletion_vector_enabled": false
      },
      {
        "table_name": "MetaData",
        "full_size": 1747,
        "is_size_after_pruning": true,
        "is_row_id_enabled": false,
        "is_cdf_enabled": false,
        "is_deletion_vector_enabled": true
      }
    ],
    "target_table_information": {
      "table_name": "step3",
      "full_size": 3943,
      "is_row_id_enabled": true,
      "is_cdf_enabled": true,
      "is_deletion_vector_enabled": false
    }
  }
}

So the problem really seems to be the left join?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group