@ManoramTaparia The issue is that current_timestamp() makes your MV non-deterministic, forcing complete recomputes. Here's how to fix it:
Solution: Use the Source Table's Change Tracking
Option 1: Leverage Source Table's Timestamp Column
@Dlt.table(
name="my_materialized_view"
)
def my_mv():
return (
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.select(
"*",
col("_commit_timestamp"). alias("last_updated") # From CDF
Option 2: Track Changes with Watermarking
@Dlt.table(
name="incremental_mv"
)
def incremental_mv():
df = dlt.read("source_table")
# Add deterministic timestamp from source data
return df.select(
"*",
col("modified_date"). alias("refresh_timestamp") # Use source column
).filter(
col("modified_date") > spark.conf.get("last_refresh_time", "1900-01-01")
Option 3: Use DLT Change Data Feed
@Dlt.table(
name="tracked_mv",
table_properties={
"delta.enableChangeDataFeed": "true"
}
)
def tracked_mv():
return dlt.read_stream("source_table")
# Query changes separately
changes_df = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", last_version) \
.table("tracked_mv")
Key Points
- Never use current_timestamp() in MV definitions
- Use source table timestamps or change feeds for tracking
- Enable CDF on source tables for proper change tracking
The CDF approach (Option 3) gives you both incremental refresh AND change identification without forcing recomputes.
Which source table timestamp columns are available?