Hello, I'm working on a DLT pipeline to build a what would be a Datawarehouse/Datamart. I'm facing issues trying to "update" my fact table when the dimensions that are outside the pipeline fail to be up to date at my processing time, so on the next run im trying to "update the dimensions reference Id's" when I'm able to get a match
My current approach is using 2 Materialized views:
# Command 1:
@dlt.table
def vw_fact():
df = spark.read.table("my_view")
df_dim_1 = spark.read.table(dim_table_1)
df_dim_2 = spark.read.table(dim_table_2) # Externally from the DLT maintained dimension table
df_joined = (
df.alias("fact")
.join(
df_dim_1.alias("dim_1"),
on=col("fact.ref_dim1_code") == col("dim_1.code_dim_1"),
how="left"
)
.join(
df_dim_2.alias("dim_2"),
on=col("fact.ref_dim2_code") == col("dim_2.code_dim_2"),
how="left"
)
.select(
col("fact.id"),
col("fact.col_1"),
col("fact.col_2"),
col("fact.col_3"),
...,
col("dim_1.id_dim_1"),
col("dim_2.id_dim_2"), # Externally from the DLT maintained dimension table, id may not be present at the time of the first processing
...,
col("fact.col_9"),
col("fact.col_10"),
col("fact.inserted_timestamp")
)
)
return df_joined
# Command 2:
table_args = {
"name": destination_fact,
"table_properties": {"quality": "silver"},
"comment": "Silver table to store Fact..."
}
dlt.create_streaming_table(**table_args)
dlt.apply_changes(
target = destination_fact,
source = "vw_fact",
keys = ["id"],
sequence_by= col('inserted_timestamp'),
stored_as_scd_type = "1"
)
# Command 3:
# The intention here is to get the Dim 2 Id's that arrived since the last processing that were not present at the Command's 1 and 2 previous execution
@dlt.table(name="fix_missing_dim_2")
def fix_missing_dim_2():
df_fact = spark.read.table(destination_fact)
df_dim_2 = spark.read.table(dim_table_2)
return (
df_fact.alias("fact")
.join(
df_dim_2.alias("dim_2"),
on=col("lead.source_afid") == col("dim_2.afid"),
how="inner"
)
.filter(col("fact.id_dim_2").isNull() & col("dim_2.id_dim_2").isNotNull())
.select(
col("fact.id"),
col("dim_2.id_dim_2"),
col("fact.inserted_timestamp")
)
)
# Command 4:
dlt.apply_changes(
target=destination_fact,
source="fix_missing_dim_2",
keys=["id"],
sequence_by=col("inserted_timestamp"),
stored_as_scd_type="1"
)
But this generates the following error:
AnalysisException: Cannot have multiple queries named `catalog`.`schema`.`destination_fact` for `catalog`.`schema`.`destination_fact`. Additional queries on that table must be named. Note that unnamed queries default to the same name as the table
Then I ran into "append-flows", where there is also mention of a "Type of flow" apply_changes, and the documentation states this:
"...Multiple apply changes flows can target a single streaming table. A streaming table that acts as a target for an apply changes flow can only be targeted by other apply changes flows...."
But there are not concrete examples of it usge.
Any help would be highly appreciated!