The issue stems from the interaction between the Change Data Feed (CDF) metadata columns (_change_type, _commit_version, _commit_timestamp) and the Delta Live Tables (DLT) library. After you import the dlt
module, the behavior of reading the CDF-enabled table changes, resulting in the absence of the metadata columns upon read.
To address this issue: 1. Understanding the Cause: By default, DLT pipelines enable CDF for better propagation of change data. However, when importing DLT, if the target table also contains columns that are reserved for CDF (_change_type, _commit_version, _commit_timestamp), the framework can skip exposing these reserved metadata columns due to conflicts or internal handling, as outlined in relevant documentation.
-
Best Practice Adjustments:
- Use the
except_column_list
parameter in dlt.apply_changes()
or filter out the columns explicitly in your code when dealing with append-only streaming tables. For example: python
@dlt.table
def my_table():
df = (
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("<path_of_CDF_enabled_table>")
)
return df.drop("_change_type", "_commit_version", "_commit_timestamp")
.
This drops these reserved metadata columns from the read DataFrame, mitigating the problem.
-
Schema Management: Ensure these reserved column names are excluded or renamed in the source table when CDF is enabled, as conflicting column names can lead to ambiguity.
-
General Steps:
- Perform the initial read before importing DLT and save the schema if required for downstream operations.
- Post-import, reconfigure your read logic to accommodate the absence of the columns or filter them out explicitly.
Hope this helps, Lou.