yesterday
Hi everyone,
- the situation:
I have a Declarative pipeline. The pipeline consists several .py files.
file1.py: creates a Materialized View: description.
file2.py: create 2nd Materialized View by reading a table "transactions" and reading the MV "description", then joining them together. Let's call it trans_with_description.
file2.py: reads trasn_with_description and does more processing. As trans_with_description has array columns, col_a, col_b. I call a function to flatten the array columns. The output is final_result.
---------
- the problem:
when I run the DLT pipeline with "full table refresh", the final_result has 0 rows. If I run the pipeline again (not full refresh), all expected rows are there, and flattened.
If I remove the flatten array operation, run the pipeline. the final_result has expected rows with array column as array.
What could be the cause of the problem?
I also noticed that without the flatten array step, the "pipeline graph" shows every element is wired correctly.
whenever I add the faltten array step, the final_result is detected from the rest "pipeline".
--
Compute: serverless
4 - Python 3.12, Scala 2.13, Java 17
22 hours ago
Hi @cdn_yyz_yul,
My best guess is that the issue is being triggered by the array-flattening step rather than by the join itself. In Databricks, materialized views are maintained via refresh logic, and the public documentation on incremental refresh for materialized views explains that not every query shape can be processed incrementally. The same documentation also notes that when a query contains unsupported expressions or operators, Databricks may fall back to a full recompute instead of using the incremental refresh path.
That lines up with the behaviour you described. When the flattening step is removed, the rows are present. When the flattening logic is added back, the first full refresh returns zero rows, but the next normal run produces the expected flattened output. That usually points to the query shape around the array expansion being the part that is not behaving well during the full refresh or recompute path, even though the upstream join itself is valid and the source data is available.
As a practical workaround, I would try separating the pipeline into two stages. First, materialise trans_with_description as-is. Then perform the array expansion in a separate downstream table or processing step. This usually makes it easier to confirm whether the full-refresh issue is specifically tied to the flattening logic. If you want to validate further, you can also review the public docs for pipeline refresh semantics, materialized views, and incremental refresh behavior, and test the query with EXPLAIN CREATE MATERIALIZED VIEW.
If useful, a simplified version of that pattern would look like this:
from pyspark import pipelines as dp
from pyspark.sql import functions as F
@dp.materialized_view(name="trans_with_description")
def trans_with_description():
tx = spark.read.table("transactions")
desc = spark.read.table("description")
return tx.join(desc, "id", "left")
@dp.table(name="final_result")
def final_result():
df = spark.read.table("trans_with_description")
return (
df
.withColumn("col_a_item", F.explode_outer("col_a"))
.withColumn("col_b_item", F.explode_outer("col_b"))
)
This is not necessarily the only valid implementation, but it is a useful way to isolate whether the refresh problem is specifically coming from the flattening step.
If this answer resolves your question, could you mark it as โAccept as Solutionโ? That helps other users quickly find the correct fix.
18 hours ago
@Ashwin_DSA
Thanks for the documentation. It will help understand DLT better, and improve our code.
For this special case, when I set "pipeline compute" to None, all the problems disappeared.
As to the flatten array function, it is being used by several other pipelines in production. Regardless, the document will help make it better.
10 hours ago
Hi @cdn_yyz_yul ,
To me, this seems to be a dependency resolution issue, rather than "refresh compute semantics". As, DLT compiler is highly restrictive in terms of "query shape", and would quickly filter out any unsupported query, and fall-back to "FULL REFRESH" mode.
As for this unpredictability in different modes, it might be just due to processing timing issue, as you mentioned that final table is detached.
- Is the "final_result" table a materialized view as well?
- How do you read/refer to the tables produced by like file1/file2? Maybe if you can share a bit of code where you refer to your produced MVs, it would be helpful.
9 hours ago
all the "objects" in the DLT pipeline are MVs.
To get content from a MV, spark.read.format("delta").table(<three level name to MV>.
"pipeline compute" is marked "in beta". Set it to None, is likely to use what has been there in the current General Release version.