- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-01-2025 10:10 AM
I am using
@Dlt.table decorator to create a table which will store history for my tables.
My code works like this:
@Dlt.table(name="table name")
def target:
Custom spark code to create history
Even though the spark code creates and prints history when i use it in normal notebook, but when I run it inside the pipeline, it does not create history, it just writes most recent record somehow.
Can someone tell me how exactly is this happening, and what I can improve, please!
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-01-2025 10:45 AM
DLT assumes the result of each @dlt.table decorator is the current state of the table at that point in time. So, when you define a DLT table using @dlt.table, whatever DataFrame is returned by that function will replace the previous data unless your logic is carefully implemented to retain historical records.
So even though your custom Spark logic prints or calculates history in a notebook, DLT will overwrite the target table unless your transformation explicitly preserves the history in the returned DataFrame.
You need to union your new records with existing table data within the transformation itself. Here's an outline of how you can do it:
Option 1: Append-based logic inside @dlt.table
import dlt
from pyspark.sql.functions import current_timestamp
@dlt.table(name="my_table_history")
def create_history():
new_data = ... # your custom spark code to fetch current records
try:
existing_data = dlt.read("my_table_history")
# Logic to identify new/changed rows
combined = existing_data.unionByName(
new_data.withColumn("ingest_time", current_timestamp()), allowMissingColumns=True
)
except Exception:
# First run, table might not exist yet
combined = new_data.withColumn("ingest_time", current_timestamp())
return combined
Option 2: Use @dlt.append instead of @dlt.table
If your goal is to append every batch to a history table, you can use @dlt.append instead:
@dlt.append(table_name="my_table_history")
def insert_history():
return your_spark_logic_df.withColumn("ingest_time", current_timestamp())
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-02-2025 09:47 AM
@lingareddy_Alva Thanks for the help.