cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT behaving differently when used with python syntax vs when used with sql syntax to read CDF

Puspak
New Contributor II

I was trying t read CDF data of a table as a DLT materialized view.

It works fine with sql syntax reading all the columns of the source table along with the 3 CDF columns : _change_type,_commit_timestamp,_commit_version:

@dlt.table()
def change_table():
    df_change = spark.sql("SELECT * FROM table_changes(<source_table_name>,1)")
    return(df_change)
 
But when I try the same with python it just reads the columns of the source table leaving out the CDF columns : _change_type,_commit_timestamp,_commit_version:
@dlt.table()
def change_table():
    df_change = spark.read.option('readChangeFeed','True').option('startingVersion',1).table(<source_table_name>)
    return(df_change)
2 REPLIES 2

Puspak
New Contributor II

But the same python code works fine when executed outside of a DLT pipeline. When I run the following in an interactive notebook it returns the source columns + CDF columns, which is logical because I am using the readChangeFeed option while reading.

spark.read.option('readChangeFeed','True').option('startingVersion',1).table(<source_table_name>)

The problem I stated occurs only when it is executed within a DLT pipeline which is strange. 

mark_ott
Databricks Employee
Databricks Employee

When accessing Change Data Feed (CDF) data in Delta Live Tables (DLT), the behavior between SQL and Python APIs differs notably regarding CDF metadata columns—_change_type, _commit_timestamp, and _commit_version.

  • SQL Approach (using table_changes):
    The SQL syntax
    SELECT * FROM table_changes(<source_table_name>, 1)
    always returns all columns from the source table plus the three CDF columns.

  • Python API Approach (using .option('readChangeFeed','True')):
    When you use
    spark.read.option('readChangeFeed','True').option('startingVersion', 1).table(<source_table_name>)
    without explicit CDF columns referenced, often the returned dataframe omits the three CDF columns, showing only the data columns from the source table. This is a known difference in behavior attributed to the underlying CDF implementation and how schema inference is treated in the Python API.


How to Access CDF Columns in Python

To ensure the CDF columns are included when reading from a Delta table using the Python API, explicitly reference the relevant columns:

python
df_change = spark.read.option('readChangeFeed','True').option('startingVersion', 1).table(<source_table_name>) df_change = df_change.select("*") # In some environments, this reveals the CDF columns # Alternatively, explicitly select the columns if "*" doesn't work: df_change = df_change.select( "*", "_change_type", "_commit_timestamp", "_commit_version" )

If the columns still do not appear after using .select("*"), use .select() naming the columns directly as above. Ensure your environment is running on Databricks Runtime 9.0 or newer and that CDF is properly enabled for the Delta table.


Workaround and Best Practices

  • Use the SQL interface (table_changes) if you need a direct, complete result including CDF columns in DLT materialized views.

  • For Python, verify the dataframe schema with df_change.printSchema()—if CDF columns are missing, switch to explicit .select() or fallback to using a SQL query within Python (spark.sql("SELECT * FROM table_changes(...)")).

  • This behavior is subject to Databricks and Delta Lake updates; always refer to the current documentation for feature changes.


Quick Table Comparison

Method Data Columns CDF Columns
table_changes (SQL) Yes Yes
spark.read option (Python API) Yes Usually No
 
 

To reliably capture CDF columns in Python, stick to SQL-based approaches or check your DataFrame schema and select columns explicitly. This is a well-documented limitation and workaround in Databricks.