Showing results for 
Search instead for 
Did you mean: 
Community Platform Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
Showing results for 
Search instead for 
Did you mean: 

Delta Live Tables Slowly Changing Dimensions Type 2 with Joins

New Contributor


I may be missing something really obvious here. The organisation I work for has started using Delta Live Tables in Databricks for data modelling, recently. One of the dimensions I am trying to model takes data from 3 existing tables in our data lake. It also needs to be a type 2 slowly changing dimension. The tables sit in a bronze, streaming layer (we will run this from a silver streaming layer in the future, but this is a proof-of-concept). The issue I'm coming across, is that because the data is coming from 3 tables, it doesn't work if changes occur to any tables that aren't the first one (person). My current code is like this:



def dim_person():
    person_df = spark.readStream.table("raw_person") \
    .filter("person_type_id = 2") \
    #gets people from the person_detail table, and replaces any blank job ids with -1
    person_detail_df = spark.readStream.table("raw_person_detail") \
    .na.fill(value=-1, subset=["job_id"]) \
    .select("person_id", "job_id", "workarea", "_Operation", "_IngestedDate") \
    jobs_df = spark.readStream.table("raw_jobs") \
    .select("job_id", "name", "_Operation", "_IngestedDate")

    #unions the jobs with a table that contains a value of 'Unknown', to join to records 
    #of -1
    jobs_full_df = jobs_df.union(spark.readStream.table("staging_data_jobs").withColumn("_IngestedDate", lit("2023-11-13").cast("date"))) \

    full_df = person_df.join(person_detail_df, on="person_id") \
                    .join(jobs_full_df, on="job_id") \
                    .selectExpr("person.person_id","person.first_name","person.surname","person.date_of_birth","person.postcode"," as job","person_detail.workarea","person._Operation", "person_detail._IngestedDate AS CDIngestedDate", "jobs._IngestedDate AS OCIngestedDate", "person._IngestedDate AS CUIngestedDate") \
                    .withColumn("_IngestedDate", greatest(*["CDIngestedDate", "OCIngestedDate", "CUIngestedDate"]))


  target = "person",
  source = "dim_person",
  keys = ["person_id"],
  sequence_by = col("_IngestedDate"),
  apply_as_deletes = expr("_Operation = 1"),
  except_column_list = ["_IngestedDate"],
  stored_as_scd_type = 2

If I run this through a pipeline, and for example, one of the job IDs has been updated in the person_details table, it will break, because the schema has changed. Is there a better way of doing SCD type 2 dimensions when joining multiple tables? 

Any suggestions much appreciated.


New Contributor II

Can it be because the default join is `inner` and that means the row must exists in both tables

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group