Add CreatedDate to Delta Live Table
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-15-2024 03:02 PM
Hi all,
I have a very simple DLT set up using the following code:
@dlt.view( name="view_name", comment="comments" ) def vw_DLT(): return spark.readStream.format("cloudFiles").option("cloudFiles.format", "csv").load(file_location) dlt.create_streaming_table( name="table_name", comment="comments" ) dlt.apply_changes( target = "table_name", source = "view_name", keys = ["id"], sequence_by = col("Date") )
I would like to create an extra column called CreatedDate which would be when the record was first inserted into the table, I can't put withColumn(CreatedDate, current_timestamp()) into vw_DLT() above because that would just get updated every load, so I've tried a couple of things:
First I added a schema to dlt.create_streaming_table which includes CreatdDate TIMESTAMP GENERATED ALWAYS AS (NOW()), but that fails with this error when I run the DLT pipeline:
Secondly I was able to:
1. Alter the __apply_changes_storage_ table and adding the CreatedDate column:
2. Alter the CreatedDate column and set default current_timestamp()
3. Update the existing records to current_timestamp() manually
All this works in a notebook and when I run the pipeline again, the new records do indeed have a new CreatedDate value, and old records did not update that column. But I'm not able to run the notebook above as part of the DLT job, since I get this error:
If that works, then all I'd need to do is run the notebook just once after the initial load, and the rest will work fine.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-06-2024 10:45 PM
To add a CreatedDate column that captures the timestamp when a record is first inserted into the table, you can modify your Delta Live Tables (DLT) pipeline setup as follows:
1) Define the schema for your streaming table to include the CreatedDate column. This column will be populated with the current timestamp when a new record is inserted.
dlt.create_streaming_table(
name="table_name",
comment="comments",
schema="""
id STRING,
Date TIMESTAMP,
CreatedDate TIMESTAMP
"""
)
2) Ensure that the apply_changes function does not overwrite the CreatedDate column on subsequent updates.
dlt.apply_changes(
target="table_name",
source="view_name",
keys=["id"],
sequence_by=col("Date"),
except_column_list=["CreatedDate"]
)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-08-2024 08:22 AM
Hi Mounika,
Why would defining the CreatedDate in the schema automatically mean "the column will be populated with the current timestamp when a new record is inserted"?

