Hi I'm creating a DLT pipeline which uses DLT CDC to implement SCD Type 1 to take the latest record using a datetime column which works with no issues:
@dlt.view
def users():
return spark.readStream.table("source_table")
dlt.create_streaming_table("target_table")
dlt.apply_changes(
target = "target_table",
source = "source_table",
keys = ["Id"],
sequence_by = col("PublishDateTime"),
stored_as_scd_type = 1
)
This gives me the following result:
SOURCE
Id DateTime
123 | 100424 1717 |
123 | 100424 1710 |
164 | 100424 1704 |
167 | 100424 1619 |
TARGET
Id DateTime
123 | 100424 1717 |
164 | 100424 1704 |
167 | 100424 1619 |
Essentially taking the latest record using ID and DateTime fields.
My question now is, how do I edit this code to take the LATEST record PER DAY. Please see below the example using the same table:
SOURCE
Id DateTime Date
123 | 100424 1717 | 100424 |
123 | 100424 1710 | 100424 |
123 | 110424 1717 | 110424 |
164 | 100424 1704 | 100424 |
164 | 110424 1728 | 110424 |
165 | 120424 1447 | 120424 |
165 | 120424 1316 | 120424 |
TARGET
Id DateTime Date
123 | 100424 1717 | 100424 |
123 | 110424 1717 | 110424 |
164 | 100424 1704 | 100424 |
164 | 110424 1728 | 110424 |
165 | 120424 1447 | 120424 |
As you can see, the target table takes the latest Id using datetime, but for EACH DAY not just the latest period.
I'm aware SCD Type 1 does not capture history so may not be the right option here but SCD Type 2 does but unsure of how to implement this. Would be eternally grateful for any advice here thanks