cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Working of @DLT.table

_singh_vish
New Contributor III

I am using 

@Dlt.table decorator to create a table which will store history for my tables.

My code works like this:

@Dlt.table(name="table name")

def target:

       Custom spark code to create history

 

Even though the spark code creates and prints history when i use it in normal notebook, but when I run it inside the pipeline, it does not create history, it just writes most recent record somehow.

Can someone tell me how exactly is this happening, and what I can improve, please!

1 ACCEPTED SOLUTION

Accepted Solutions

lingareddy_Alva
Honored Contributor II

@_singh_vish 

DLT assumes the result of each @dlt.table decorator is the current state of the table at that point in time. So, when you define a DLT table using @dlt.table, whatever DataFrame is returned by that function will replace the previous data unless your logic is carefully implemented to retain historical records.
So even though your custom Spark logic prints or calculates history in a notebook, DLT will overwrite the target table unless your transformation explicitly preserves the history in the returned DataFrame.

You need to union your new records with existing table data within the transformation itself. Here's an outline of how you can do it:
Option 1: Append-based logic inside @dlt.table

import dlt
from pyspark.sql.functions import current_timestamp

@dlt.table(name="my_table_history")
def create_history():
new_data = ... # your custom spark code to fetch current records

try:
existing_data = dlt.read("my_table_history")
# Logic to identify new/changed rows
combined = existing_data.unionByName(
new_data.withColumn("ingest_time", current_timestamp()), allowMissingColumns=True
)
except Exception:
# First run, table might not exist yet
combined = new_data.withColumn("ingest_time", current_timestamp())

return combined

Option 2: Use @dlt.append instead of @dlt.table

If your goal is to append every batch to a history table, you can use @dlt.append instead:

@dlt.append(table_name="my_table_history")
def insert_history():
return your_spark_logic_df.withColumn("ingest_time", current_timestamp())

 

LR

View solution in original post

2 REPLIES 2

lingareddy_Alva
Honored Contributor II

@_singh_vish 

DLT assumes the result of each @dlt.table decorator is the current state of the table at that point in time. So, when you define a DLT table using @dlt.table, whatever DataFrame is returned by that function will replace the previous data unless your logic is carefully implemented to retain historical records.
So even though your custom Spark logic prints or calculates history in a notebook, DLT will overwrite the target table unless your transformation explicitly preserves the history in the returned DataFrame.

You need to union your new records with existing table data within the transformation itself. Here's an outline of how you can do it:
Option 1: Append-based logic inside @dlt.table

import dlt
from pyspark.sql.functions import current_timestamp

@dlt.table(name="my_table_history")
def create_history():
new_data = ... # your custom spark code to fetch current records

try:
existing_data = dlt.read("my_table_history")
# Logic to identify new/changed rows
combined = existing_data.unionByName(
new_data.withColumn("ingest_time", current_timestamp()), allowMissingColumns=True
)
except Exception:
# First run, table might not exist yet
combined = new_data.withColumn("ingest_time", current_timestamp())

return combined

Option 2: Use @dlt.append instead of @dlt.table

If your goal is to append every batch to a history table, you can use @dlt.append instead:

@dlt.append(table_name="my_table_history")
def insert_history():
return your_spark_logic_df.withColumn("ingest_time", current_timestamp())

 

LR

@lingareddy_Alva Thanks for the help.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now