Hello community,
I am using dlt.apply_changes function to implement SCD2. I am specifying the schema of my streaming_table that should result from apply_changes().
This schema contains a generated column.
Somehow, my DLT pipeline returns always in first run my streaming table with generated column set to null.
Whenever, I fully refresh my pipeline, the generated column is computed correctly.
Is there any explanation why this problem arises only in the first run?
How can I avoid it as I want to destroy (using Databricks asset bundle) this pipeline and launches it from scratch only in testing phase of my CI pipeline?
Below you find my code,
dlt.create_streaming_table(
name="silver_table",
schema="""row_id STRING NOT NULL,
col_a STRING NOT NULL,
`__START_AT` TIMESTAMP NOT NULL,
`__END_AT` TIMESTAMP,
last_updated TIMESTAMP,
is_current BOOLEAN NOT NULL GENERATED ALWAYS AS (CASE WHEN `__END_AT` IS NULL THEN true ELSE false END),
""",
cluster_by=["col_a"],
comment="scd2 table in silver layer",
)
dlt.apply_changes(
source="data_input_cdc",
target="silver_table",
keys=["row_id"],
sequence_by=F.col("synced"),
except_column_list=[
"synced",
"record_deleted"
],
stored_as_scd_type=2,
apply_as_deletes=F.expr("record_deleted= true"),
)