Hi,
I may be missing something really obvious here. The organisation I work for has started using Delta Live Tables in Databricks for data modelling, recently. One of the dimensions I am trying to model takes data from 3 existing tables in our data lake. It also needs to be a type 2 slowly changing dimension. The tables sit in a bronze, streaming layer (we will run this from a silver streaming layer in the future, but this is a proof-of-concept). The issue I'm coming across, is that because the data is coming from 3 tables, it doesn't work if changes occur to any tables that aren't the first one (person). My current code is like this:
@dlt.view
def dim_person():
person_df = spark.readStream.table("raw_person") \
.filter("person_type_id = 2") \
.alias("person")
#gets people from the person_detail table, and replaces any blank job ids with -1
person_detail_df = spark.readStream.table("raw_person_detail") \
.na.fill(value=-1, subset=["job_id"]) \
.select("person_id", "job_id", "workarea", "_Operation", "_IngestedDate") \
.alias("person_detail")
jobs_df = spark.readStream.table("raw_jobs") \
.select("job_id", "name", "_Operation", "_IngestedDate")
#unions the jobs with a table that contains a value of 'Unknown', to join to records
#of -1
jobs_full_df = jobs_df.union(spark.readStream.table("staging_data_jobs").withColumn("_IngestedDate", lit("2023-11-13").cast("date"))) \
.alias("jobs")
full_df = person_df.join(person_detail_df, on="person_id") \
.join(jobs_full_df, on="job_id") \
.selectExpr("person.person_id","person.first_name","person.surname","person.date_of_birth","person.postcode","jobs.name as job","person_detail.workarea","person._Operation", "person_detail._IngestedDate AS CDIngestedDate", "jobs._IngestedDate AS OCIngestedDate", "person._IngestedDate AS CUIngestedDate") \
.withColumn("_IngestedDate", greatest(*["CDIngestedDate", "OCIngestedDate", "CUIngestedDate"]))
return full_df.select("person_id","first_name","surname","date_of_birth","postcode","job","workarea","_Operation","_IngestedDate")
dlt.create_streaming_table("person")
dlt.apply_changes(
target = "person",
source = "dim_person",
keys = ["person_id"],
sequence_by = col("_IngestedDate"),
apply_as_deletes = expr("_Operation = 1"),
except_column_list = ["_IngestedDate"],
stored_as_scd_type = 2
)
If I run this through a pipeline, and for example, one of the job IDs has been updated in the person_details table, it will break, because the schema has changed. Is there a better way of doing SCD type 2 dimensions when joining multiple tables?
Any suggestions much appreciated.