@_singh_vish
DLT assumes the result of each @dlt.table decorator is the current state of the table at that point in time. So, when you define a DLT table using @dlt.table, whatever DataFrame is returned by that function will replace the previous data unless your logic is carefully implemented to retain historical records.
So even though your custom Spark logic prints or calculates history in a notebook, DLT will overwrite the target table unless your transformation explicitly preserves the history in the returned DataFrame.
You need to union your new records with existing table data within the transformation itself. Here's an outline of how you can do it:
Option 1: Append-based logic inside @dlt.table
import dlt
from pyspark.sql.functions import current_timestamp
@dlt.table(name="my_table_history")
def create_history():
new_data = ... # your custom spark code to fetch current records
try:
existing_data = dlt.read("my_table_history")
# Logic to identify new/changed rows
combined = existing_data.unionByName(
new_data.withColumn("ingest_time", current_timestamp()), allowMissingColumns=True
)
except Exception:
# First run, table might not exist yet
combined = new_data.withColumn("ingest_time", current_timestamp())
return combined
Option 2: Use @dlt.append instead of @dlt.table
If your goal is to append every batch to a history table, you can use @dlt.append instead:
@dlt.append(table_name="my_table_history")
def insert_history():
return your_spark_logic_df.withColumn("ingest_time", current_timestamp())
LR