cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Community Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Delta Live Tables Slowly Changing Dimensions Type 2 with Joins

HowardLJ
New Contributor

Hi,

I may be missing something really obvious here. The organisation I work for has started using Delta Live Tables in Databricks for data modelling, recently. One of the dimensions I am trying to model takes data from 3 existing tables in our data lake. It also needs to be a type 2 slowly changing dimension. The tables sit in a bronze, streaming layer (we will run this from a silver streaming layer in the future, but this is a proof-of-concept). The issue I'm coming across, is that because the data is coming from 3 tables, it doesn't work if changes occur to any tables that aren't the first one (person). My current code is like this:

 

@dlt.view

def dim_person():
    person_df = spark.readStream.table("raw_person") \
    .filter("person_type_id = 2") \
    .alias("person")
    
    #gets people from the person_detail table, and replaces any blank job ids with -1
    person_detail_df = spark.readStream.table("raw_person_detail") \
    .na.fill(value=-1, subset=["job_id"]) \
    .select("person_id", "job_id", "workarea", "_Operation", "_IngestedDate") \
    .alias("person_detail")
    
    jobs_df = spark.readStream.table("raw_jobs") \
    .select("job_id", "name", "_Operation", "_IngestedDate")

    #unions the jobs with a table that contains a value of 'Unknown', to join to records 
    #of -1
    jobs_full_df = jobs_df.union(spark.readStream.table("staging_data_jobs").withColumn("_IngestedDate", lit("2023-11-13").cast("date"))) \
    .alias("jobs")

    full_df = person_df.join(person_detail_df, on="person_id") \
                    .join(jobs_full_df, on="job_id") \
                    .selectExpr("person.person_id","person.first_name","person.surname","person.date_of_birth","person.postcode","jobs.name as job","person_detail.workarea","person._Operation", "person_detail._IngestedDate AS CDIngestedDate", "jobs._IngestedDate AS OCIngestedDate", "person._IngestedDate AS CUIngestedDate") \
                    .withColumn("_IngestedDate", greatest(*["CDIngestedDate", "OCIngestedDate", "CUIngestedDate"]))

    return full_df.select("person_id","first_name","surname","date_of_birth","postcode","job","workarea","_Operation","_IngestedDate")
dlt.create_streaming_table("person")

dlt.apply_changes(
  target = "person",
  source = "dim_person",
  keys = ["person_id"],
  sequence_by = col("_IngestedDate"),
  apply_as_deletes = expr("_Operation = 1"),
  except_column_list = ["_IngestedDate"],
  stored_as_scd_type = 2
)

If I run this through a pipeline, and for example, one of the job IDs has been updated in the person_details table, it will break, because the schema has changed. Is there a better way of doing SCD type 2 dimensions when joining multiple tables? 

Any suggestions much appreciated.

1 REPLY 1

the_real_merca
New Contributor II

Can it be because the default join is `inner` and that means the row must exists in both tables

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!