cancel
Showing results for 
Search instead for 
Did you mean: 
Community Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT CDC/SCD - Taking the latest ID per day

dm7
New Contributor

Hi I'm creating a DLT pipeline which uses DLT CDC to implement SCD Type 1 to take the latest record using a datetime column which works with no issues:

@dlt.view
def users():
  return spark.readStream.table("source_table")

dlt.create_streaming_table("target_table")

dlt.apply_changes(
  target = "target_table",
  source = "source_table",
  keys = ["Id"],
  sequence_by = col("PublishDateTime"),
  stored_as_scd_type = 1
)

This gives me the following result:

SOURCE

Id DateTime
123100424 1717
123100424 1710
164100424 1704
167100424 1619

TARGET

Id DateTime
123100424 1717
164100424 1704
167100424 1619

Essentially taking the latest record using ID and DateTime fields.

My question now is, how do I edit this code to take the LATEST record PER DAY. Please see below the example using the same table:

SOURCE

Id DateTime Date
123100424 1717100424
123100424 1710100424
123110424 1717110424
164100424 1704100424
164110424 1728110424
165120424 1447120424
165120424 1316120424

TARGET

Id DateTime Date
123100424 1717100424
123110424 1717110424
164100424 1704100424
164110424 1728110424
165120424 1447120424

As you can see, the target table takes the latest Id using datetime, but for EACH DAY not just the latest period.

I'm aware SCD Type 1 does not capture history so may not be the right option here but SCD Type 2 does but unsure of how to implement this. Would be eternally grateful for any advice here thanks

1 REPLY 1

Kaniz
Community Manager
Community Manager

Hi @dm7, Thank you for providing the details of your DLT pipeline and the desired outcome!

It looks like you’re trying to implement a Slowly Changing Dimension (SCD) Type 2 behaviour where you want to capture historical changes over time.

Let’s break down the problem and discuss how you can achieve this.

  1. Understanding the Problem:

    • You have a source table with records containing an Id, DateTime, and Date.
    • You want to create a target table that captures the latest record for each Id per day (Date).
  2. Approach:

    • Since SCD Type 1 doesn’t capture historical changes, we’ll need to use SCD Type 2.
    • In SCD Type 2, we maintain historical versions of records by introducing additional columns (e.g., ValidFrom, ValidTo, and IsCurrent).
    • We’ll update the existing records in the target table and insert new records when changes occur.
  3. Implementation:

    • Here’s how you can modify your code to achieve the desired behavior:
@dlt.view
def users():
    return spark.readStream.table("source_table")

dlt.create_streaming_table("target_table")

# Assuming you have a DataFrame with the source data
source_df = users()

# Add a new column for the valid date range (ValidFrom and ValidTo)
source_df = source_df.withColumn("ValidFrom", lit("1970-01-01"))  # Set an initial valid from date
source_df = source_df.withColumn("ValidTo", lit("9999-12-31"))  # Set an initial valid to date

# Window function to find the latest record per day
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.partitionBy("Id", "Date").orderBy(desc("DateTime"))
source_df = source_df.withColumn("row_num", row_number().over(window_spec))

# Update the valid date range based on the latest record
source_df = source_df.withColumn("ValidTo", when(col("row_num") == 1, col("DateTime")).otherwise(col("ValidTo")))

# Filter only the latest records
latest_records_df = source_df.where(col("row_num") == 1).drop("row_num")

# Write the latest records to the target table
latest_records_df.writeStream.outputMode("update").table("target_table")
  1. Explanation:

    • We add ValidFrom and ValidTo columns to track the date range during which a record is valid.
    • The window function helps us find the latest record per Id and Date.
    • If a new record arrives, we update the ValidTo date of the previous latest record.
    • Finally, we write the latest records to the target table.
  2. Notes:

    • Make sure to adjust the column names and data types according to your actual schema.
    • You may need to handle initial setup (e.g., setting valid dates for existing records).
    • Consider using a batch process to handle historical data before starting the streaming process.

I hope this helps! If you have any further questions or need additional clarification, feel free to ask. 😊