from pyspark.sql.functions import col, lit, expr, when, to_timestamp, current_timestamp
from pyspark.sql.functions import max as max_
import dlt
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.utils import AnalysisException
from pyspark.sql.functions import col, struct, lit
from typing import Optional, Tuple
from pandas import DataFrame
# Define the source table
source_table = "LIVE.source"
# Define the target table
target_table = "target"
dlt.create_streaming_table(target_table)
schema = StructType([
StructField("name", StringType(), True),
StructField("rackName", StringType(), True),
StructField("clusterName", StringType(), True)
])
@Dlt.table(name="source_89939") def source_89939():
# Read from the source table
df = spark.readStream.format("delta").option("skipChangeCommits", "true").table(source_table)
# Filter data from the last 24 hours
filtered_df = df.filter(
(col("tenant") == lit("vdi")) &
(col("rack") != lit("unknown")) &
(to_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS'Z'") >= expr("current_timestamp() - interval 24 hours"))
)
# Select the necessary columns
selected_df = filtered_df.select(
col("name").alias("name"),
col("rack").alias("rackName"),
col("hosts").alias("hosts"),
to_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS'Z'").alias("timestamp")
)
return selected_df
dlt.apply_changes(
target=target_table,
source="source_89939",
keys=["name"],
sequence_by=col("timestamp"),
name="initial"
)
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
# Read from the source table
source_df = spark.read \
.format("delta") \
.option("ignoreDeletes", "true") \
.table(source_table)
# Read from the target table using dlt
target_df =dlt.read("700025_ctg_dev.test_ramu.target")
# Perform the join operation
result_df = target_df.alias("t").join(
source_df.alias("s"),
on="name",
how="left_anti"
).select(
col("t.name").alias("name"),
lit(None).alias("rackName"),
current_timestamp().alias("timestamp")
)
# Return the latest snapshot version and the result DataFrame
return (latest_snapshot_version, result_df)
dlt.create_auto_cdc_from_snapshot_flow(
target = target_table,
source = next_snapshot_and_version,
keys = ["name"],
track_history_column_list = None,
track_history_except_column_list = None
)
calculate a snaposhot and add them to target to make dlt do forceful delete
om.databricks.pipelines.common.errors.DLTSparkException: [REFERENCE_DLT_DATASET_OUTSIDE_QUERY_DEFINITION] Referencing DLT dataset `700025_ctg_dev`.`test_ramu`.`target` outside the dataset query definition (i.e., @Dlt.table annotation) is not supported. Please read it instead inside the dataset query definition. getting this exception