cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

CDF metadata columns are lost after importing dlt

Ru
New Contributor III

Hi Databricks Community,

 
I attempted to read the Change Feed from a CDF-enabled table. Initially, the correct table schema, including the metadata columns (_change_type, _commit_version, and _commit_timestamp), was returned as expected. However, after importing the dlt library and reading the changes again, the metadata columns were missing. Could you help me resolve this issue? Thank you in advance!
 
# Databricks notebook source
changeset_cols_before = (
    spark.read
    .option("readChangeFeed", "true")
    .option("startingVersion", 0)
    .table("<path_of_CDF_enabled_table>")
    .columns
)

# COMMAND ----------

import dlt

# COMMAND ----------

changeset_cols_after = (
    spark.read
    .option("readChangeFeed", "true")
    .option("startingVersion", 0)
    .table("<path_of_CDF_enabled_table>")
    .columns
)

# COMMAND ----------

missing_cols = [col for col in changeset_cols_before if col not in changeset_cols_after]
print(missing_cols)

# result: ['_change_type', '_commit_version', '_commit_timestamp']

 

 

2 REPLIES 2

BigRoux
Databricks Employee
Databricks Employee
The issue stems from the interaction between the Change Data Feed (CDF) metadata columns (_change_type, _commit_version, _commit_timestamp) and the Delta Live Tables (DLT) library. After you import the dlt module, the behavior of reading the CDF-enabled table changes, resulting in the absence of the metadata columns upon read.
To address this issue: 1. Understanding the Cause: By default, DLT pipelines enable CDF for better propagation of change data. However, when importing DLT, if the target table also contains columns that are reserved for CDF (_change_type, _commit_version, _commit_timestamp), the framework can skip exposing these reserved metadata columns due to conflicts or internal handling, as outlined in relevant documentation.
  1. Best Practice Adjustments:
    • Use the except_column_list parameter in dlt.apply_changes() or filter out the columns explicitly in your code when dealing with append-only streaming tables. For example: python @dlt.table def my_table(): df = ( spark.read .option("readChangeFeed", "true") .option("startingVersion", 0) .table("<path_of_CDF_enabled_table>") ) return df.drop("_change_type", "_commit_version", "_commit_timestamp") .
    This drops these reserved metadata columns from the read DataFrame, mitigating the problem.
  2. Schema Management: Ensure these reserved column names are excluded or renamed in the source table when CDF is enabled, as conflicting column names can lead to ambiguity.
  3. General Steps:
    • Perform the initial read before importing DLT and save the schema if required for downstream operations.
    • Post-import, reconfigure your read logic to accommodate the absence of the columns or filter them out explicitly.
Hope this helps, Lou.

Ru
New Contributor III

Hi Lou,

 

Thank you for the explanation! In my case, I was reading a CDF table outside the dlt pipeline, and I need to import some functions from our shared ETL modules, which imports the dlt library. However, the behaviour gets altered by simply having the import, even without using any dlt functionality.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now