cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Merge using DLT

Ramu1821
New Contributor II

I have a requirement where i need only 24 hours data from my delta table

lets call this as latest table

this latest table should be in sync with source
so, it should handle all updates and inserts along with delete (if something gets deleted at source, then it should be deleted at target)

any approach for this , we are looking for a solution in context of DLT 
i am using create_auto_cdc but this is into taking care of deletes 
apply_as_deletes is only taking care of coming data, but not exisitng data 




2 REPLIES 2

Brahmareddy
Esteemed Contributor

Hi Ramu1821,

How are you doing today? as per my understanding, for your use case of keeping only the last 24 hours of data in syncโ€”including inserts, updates, and deletesโ€”within a Delta Live Tables (DLT) pipeline, you're right that create_streaming_live_table(...).create_auto_cdc() and apply_as_deletes = true help capture delete events only when they arrive, but they donโ€™t clean up older data already in the table. A common approach here is to use a two-step logic: first, capture CDC events using apply_changes (or your custom logic) and write to a staging table; then, in your final โ€œlatestโ€ table, filter only records where the timestamp is within the last 24 hours. To handle deletes for records that didnโ€™t get new change events, you can run a scheduled batch process (or add logic in a DLT table with triggered mode) that compares timestamps and removes anything older than 24 hours. While DLT doesnโ€™t offer full automatic โ€œtime-windowed retentionโ€ with delete tracking out-of-the-box, combining CDC, timestamp-based filtering, and a scheduled cleanup can get you close to your goal. Let me know if you'd like a code example to get started!

Regards,

Brahma

Ramu1821
New Contributor II
from pyspark.sql.functions import col, lit, expr, when, to_timestamp, current_timestamp
from pyspark.sql.functions import max as max_
import dlt
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.utils import AnalysisException
from pyspark.sql.functions import col, struct, lit
from typing import Optional, Tuple
from pandas import DataFrame
 
# Define the source table
source_table =  "LIVE.source"
 
# Define the target table
target_table = "target"
 
dlt.create_streaming_table(target_table)
 
 
 
schema = StructType([
    StructField("name", StringType(), True),
    StructField("rackName", StringType(), True),
    StructField("clusterName", StringType(), True)
])
 
 
 
@Dlt.table(name="source_89939")
def source_89939():
    # Read from the source table
    df = spark.readStream.format("delta").option("skipChangeCommits", "true").table(source_table)
 
    # Filter data from the last 24 hours
    filtered_df = df.filter(
        (col("tenant") == lit("vdi")) &
        (col("rack") != lit("unknown")) &
        (to_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS'Z'") >= expr("current_timestamp() - interval 24 hours"))
    )
 
    # Select the necessary columns
    selected_df = filtered_df.select(
        col("name").alias("name"),
        col("rack").alias("rackName"),
        col("hosts").alias("hosts"),
        to_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS'Z'").alias("timestamp")
    )

    return selected_df
 
dlt.apply_changes(
    target=target_table,
    source="source_89939",
    keys=["name"],
    sequence_by=col("timestamp"),
    name="initial"
)

 

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
    # Read from the source table
    source_df = spark.read \
        .format("delta") \
        .option("ignoreDeletes", "true") \
        .table(source_table)

    # Read from the target table using dlt
    target_df =dlt.read("700025_ctg_dev.test_ramu.target")

    # Perform the join operation
    result_df = target_df.alias("t").join(
        source_df.alias("s"),
        on="name",
        how="left_anti"
    ).select(
        col("t.name").alias("name"),
        lit(None).alias("rackName"),
        current_timestamp().alias("timestamp")
    )

    # Return the latest snapshot version and the result DataFrame
    return (latest_snapshot_version, result_df)
 
dlt.create_auto_cdc_from_snapshot_flow(
  target = target_table,
  source = next_snapshot_and_version,
  keys = ["name"],
  track_history_column_list = None,
  track_history_except_column_list = None
)

calculate a snaposhot and add them to target to make dlt do forceful delete

om.databricks.pipelines.common.errors.DLTSparkException: [REFERENCE_DLT_DATASET_OUTSIDE_QUERY_DEFINITION] Referencing DLT dataset `700025_ctg_dev`.`test_ramu`.`target` outside the dataset query definition (i.e., @Dlt.table annotation) is not supported. Please read it instead inside the dataset query definition.

getting this exception


Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now