ck7007
Contributor II

@ManoramTaparia The issue is that current_timestamp() makes your MV non-deterministic, forcing complete recomputes. Here's how to fix it:

Solution: Use the Source Table's Change Tracking

Option 1: Leverage Source Table's Timestamp Column
@Dlt.table(
name="my_materialized_view"
)
def my_mv():
return (
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.select(
"*",
col("_commit_timestamp"). alias("last_updated") # From CDF

Option 2: Track Changes with Watermarking
@Dlt.table(
name="incremental_mv"
)
def incremental_mv():
df = dlt.read("source_table")

# Add deterministic timestamp from source data
return df.select(
"*",
col("modified_date"). alias("refresh_timestamp") # Use source column
).filter(
col("modified_date") > spark.conf.get("last_refresh_time", "1900-01-01")

Option 3: Use DLT Change Data Feed

@Dlt.table(
name="tracked_mv",
table_properties={
"delta.enableChangeDataFeed": "true"
}
)
def tracked_mv():
return dlt.read_stream("source_table")

# Query changes separately
changes_df = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", last_version) \
.table("tracked_mv")

Key Points

  1. Never use current_timestamp() in MV definitions
  2. Use source table timestamps or change feeds for tracking
  3. Enable CDF on source tables for proper change tracking

The CDF approach (Option 3) gives you both incremental refresh AND change identification without forcing recomputes.

Which source table timestamp columns are available?