Hi all,
I have a very simple DLT set up using the following code:
@dlt.view(
name="view_name",
comment="comments"
)
def vw_DLT():
return spark.readStream.format("cloudFiles").option("cloudFiles.format", "csv").load(file_location)
dlt.create_streaming_table(
name="table_name",
comment="comments"
)
dlt.apply_changes(
target = "table_name",
source = "view_name",
keys = ["id"],
sequence_by = col("Date")
)
I would like to create an extra column called CreatedDate which would be when the record was first inserted into the table, I can't put withColumn(CreatedDate, current_timestamp()) into vw_DLT() above because that would just get updated every load, so I've tried a couple of things:
First I added a schema to dlt.create_streaming_table which includes CreatdDate TIMESTAMP GENERATED ALWAYS AS (NOW()), but that fails with this error when I run the DLT pipeline:
Secondly I was able to:
1. Alter the __apply_changes_storage_ table and adding the CreatedDate column:
2. Alter the CreatedDate column and set default current_timestamp()
3. Update the existing records to current_timestamp() manually
All this works in a notebook and when I run the pipeline again, the new records do indeed have a new CreatedDate value, and old records did not update that column. But I'm not able to run the notebook above as part of the DLT job, since I get this error:
If that works, then all I'd need to do is run the notebook just once after the initial load, and the rest will work fine.