Ramu1821
New Contributor II
from pyspark.sql.functions import col, lit, expr, when, to_timestamp, current_timestamp
from pyspark.sql.functions import max as max_
import dlt
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.utils import AnalysisException
from pyspark.sql.functions import col, struct, lit
from typing import Optional, Tuple
from pandas import DataFrame
 
# Define the source table
source_table =  "LIVE.source"
 
# Define the target table
target_table = "target"
 
dlt.create_streaming_table(target_table)
 
 
 
schema = StructType([
    StructField("name", StringType(), True),
    StructField("rackName", StringType(), True),
    StructField("clusterName", StringType(), True)
])
 
 
 
@Dlt.table(name="source_89939")
def source_89939():
    # Read from the source table
    df = spark.readStream.format("delta").option("skipChangeCommits", "true").table(source_table)
 
    # Filter data from the last 24 hours
    filtered_df = df.filter(
        (col("tenant") == lit("vdi")) &
        (col("rack") != lit("unknown")) &
        (to_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS'Z'") >= expr("current_timestamp() - interval 24 hours"))
    )
 
    # Select the necessary columns
    selected_df = filtered_df.select(
        col("name").alias("name"),
        col("rack").alias("rackName"),
        col("hosts").alias("hosts"),
        to_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS'Z'").alias("timestamp")
    )

    return selected_df
 
dlt.apply_changes(
    target=target_table,
    source="source_89939",
    keys=["name"],
    sequence_by=col("timestamp"),
    name="initial"
)

 

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
    # Read from the source table
    source_df = spark.read \
        .format("delta") \
        .option("ignoreDeletes", "true") \
        .table(source_table)

    # Read from the target table using dlt
    target_df =dlt.read("700025_ctg_dev.test_ramu.target")

    # Perform the join operation
    result_df = target_df.alias("t").join(
        source_df.alias("s"),
        on="name",
        how="left_anti"
    ).select(
        col("t.name").alias("name"),
        lit(None).alias("rackName"),
        current_timestamp().alias("timestamp")
    )

    # Return the latest snapshot version and the result DataFrame
    return (latest_snapshot_version, result_df)
 
dlt.create_auto_cdc_from_snapshot_flow(
  target = target_table,
  source = next_snapshot_and_version,
  keys = ["name"],
  track_history_column_list = None,
  track_history_except_column_list = None
)

calculate a snaposhot and add them to target to make dlt do forceful delete

om.databricks.pipelines.common.errors.DLTSparkException: [REFERENCE_DLT_DATASET_OUTSIDE_QUERY_DEFINITION] Referencing DLT dataset `700025_ctg_dev`.`test_ramu`.`target` outside the dataset query definition (i.e., @Dlt.table annotation) is not supported. Please read it instead inside the dataset query definition.

getting this exception